This is an automated email from the ASF dual-hosted git repository. zstan pushed a commit to branch ignite-26069 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 41134b32b246cb015da2bf11bc94b5e197b24085 Author: zstan <[email protected]> AuthorDate: Fri Oct 3 09:59:19 2025 +0300 fix --- .../network/PartitionReplicationMessageGroup.java | 12 +++ .../replication/GetEstimatedSizeRequest.java | 1 + ...GetEstimatedSizeWithLastModifiedTsRequest.java} | 12 +-- ...etEstimatedSizeWithLastModifiedTsResponse.java} | 14 +-- .../internal/sql/engine/SqlQueryProcessor.java | 2 +- .../GetEstimatedSizeWithLastModifiedTsRequest.java | 10 -- .../sql/engine/message/SqlQueryMessageGroup.java | 2 - .../engine/statistic/SqlStatisticManagerImpl.java | 20 +++- .../sql/engine/statistic/StatisticAggregator.java | 113 ++++++++++++++++++--- .../distributed/PartitionModificationCounter.java | 75 +++++++++++++- .../PartitionModificationCounterFactory.java | 13 ++- .../internal/table/distributed/TableManager.java | 19 ++-- .../ignite/internal/table/TableTestUtils.java | 10 +- 13 files changed, 247 insertions(+), 56 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java index 09c2f55e802..529df3f7c32 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java @@ -53,6 +53,8 @@ import org.apache.ignite.internal.partition.replicator.network.replication.Binar import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest; @@ -218,6 +220,16 @@ public interface PartitionReplicationMessageGroup { */ short CHANGE_PEERS_AND_LEARNERS_ASYNC_REPLICA_REQUEST = 28; + /** + * Message type for {@link GetEstimatedSizeWithLastModifiedTsRequest}. + */ + short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST = 29; + + /** + * Message type for {@link GetEstimatedSizeWithLastModifiedTsResponse}. + */ + short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE = 30; + /** * Message types for partition replicator module RAFT commands. * diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java index de7de4b998e..72f36259955 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.partition.replicator.network.replication; +import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java similarity index 78% copy from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java copy to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java index de7de4b998e..a7f0602f95d 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java @@ -17,14 +17,12 @@ package org.apache.ignite.internal.partition.replicator.network.replication; +import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; -import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; -import org.apache.ignite.internal.replicator.message.TableAware; -/** - * Request for getting an estimated size of a partition. - */ -@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE) -public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest, TableAware { +@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST) +public interface GetEstimatedSizeWithLastModifiedTsRequest extends NetworkMessage { + /** ID of the table. */ + int tableId(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java similarity index 78% copy from modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java copy to modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java index de7de4b998e..06bce1cc2a1 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.partition.replicator.network.replication; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; -import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; -import org.apache.ignite.internal.replicator.message.TableAware; -/** - * Request for getting an estimated size of a partition. - */ -@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE) -public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest, TableAware { +@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE) +public interface GetEstimatedSizeWithLastModifiedTsResponse extends NetworkMessage { + HybridTimestamp ts(); + + long estimatedSize(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 5fce14f8488..936372db7aa 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -255,7 +255,7 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { this.killCommandHandler = killCommandHandler; this.eventLog = eventLog; - statAggregator = new StatisticAggregator(placementDriver, clockService::current, tableManager); + statAggregator = new StatisticAggregator(placementDriver, clockService::current, tableManager, clusterSrvc.topologyService()); sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, catalogManager, lowWaterMark, commonScheduler, statAggregator); sqlSchemaManager = new SqlSchemaManagerImpl( catalogManager, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java deleted file mode 100644 index a9ed8d493ce..00000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.apache.ignite.internal.sql.engine.message; - -import org.apache.ignite.internal.network.NetworkMessage; -import org.apache.ignite.internal.network.annotations.Transferable; - -@Transferable(SqlQueryMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE) -public interface GetEstimatedSizeWithLastModifiedTsRequest extends NetworkMessage { - /** ID of the table. */ - int tableId(); -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java index 8bbd77d0d7c..ea0d1f1509a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java @@ -46,6 +46,4 @@ public final class SqlQueryMessageGroup { /** See {@link CancelOperationResponse} for the details. */ public static final short OPERATION_CANCEL_RESPONSE = 7; - - public static final short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 8; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java index 048655ad5c8..abd8f3e80f9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.statistic; import static org.apache.ignite.internal.event.EventListener.fromConsumer; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -106,6 +107,23 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { */ @Override public long tableSize(int tableId) { +/* TableViewInternal tableView = tableManager.cachedTable(tableId); + + if (tableView == null) { + return 1; + } + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> updateResult = statAggregator.estimatedSizeWithLastUpdate( + tableView.internalTable()); + + updateResult.join();*/ + return tableSizeMap.computeIfAbsent(tableId, k -> DEFAULT_VALUE).getSize(); } @@ -143,7 +161,7 @@ public class SqlStatisticManagerImpl implements SqlStatisticManager { continue; } - CompletableFuture<Void> updateResult = tableView.internalTable().estimatedSizeWithLastUpdate() + CompletableFuture<Void> updateResult = statAggregator.estimatedSizeWithLastUpdate(tableView.internalTable()) .thenAccept(res -> { // the table can be concurrently dropped and we shouldn't put new value in this case. tableSizeMap.computeIfPresent(tableId, (k, v) -> { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java index eca0f32cfd0..003560f7008 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java @@ -1,23 +1,34 @@ package org.apache.ignite.internal.sql.engine.statistic; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.stream.Collectors.toList; + import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.message.DataPresence; +import org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse; +import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsResponse; import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.sql.engine.message.GetEstimatedSizeWithLastModifiedTsRequest; -import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; -import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.storage.lease.LeaseInfo; import org.apache.ignite.internal.table.InternalTable; -import org.apache.ignite.internal.table.TableViewInternal; import org.apache.ignite.internal.table.distributed.TableManager; import org.jetbrains.annotations.Nullable; @@ -29,15 +40,21 @@ public class StatisticAggregator { private @Nullable String nodeName; private final TableManager tableManager; private static final SqlQueryMessagesFactory MSG_FACTORY = new SqlQueryMessagesFactory(); + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); + private TopologyService topologyService; + private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(3); public StatisticAggregator( PlacementDriver placementDriver, Supplier<HybridTimestamp> currentClock, - TableManager tableManager + TableManager tableManager, + TopologyService topologyService ) { this.placementDriver = placementDriver; this.currentClock = currentClock; this.tableManager = tableManager; + this.topologyService = topologyService; } public void nodeName(String nodeName) { @@ -47,7 +64,9 @@ public class StatisticAggregator { public void messaging(MessagingService messagingService) { this.messagingService = messagingService; - messagingService.addMessageHandler(SqlQueryMessageGroup.class, (message, sender, correlationId) -> { + messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); + +/* messagingService.addMessageHandler(SqlQueryMessageGroup.class, (message, sender, correlationId) -> { if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { GetEstimatedSizeWithLastModifiedTsRequest msg = (GetEstimatedSizeWithLastModifiedTsRequest) message; @@ -77,11 +96,16 @@ public class StatisticAggregator { storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId)); } - }); + });*/ + + //InternalClusterNode localNode = clusterSrvc.topologyService().localMember(); + //String nodeName = localNode.name(); } - private void onMessage(InternalClusterNode node, GetEstimatedSizeWithLastModifiedTsRequest msg) { - assert node != null && msg != null; + private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { + if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse) { + System.err.println("!!!!"); + } } /** @@ -89,7 +113,9 @@ public class StatisticAggregator { * * @return Estimated size of this table with last modification timestamp. */ - public LongObjectImmutablePair<HybridTimestamp> estimatedSizeWithLastUpdate(InternalTable table) { + public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> estimatedSizeWithLastUpdate(InternalTable table) { + assert messagingService != null; + int partitions = table.partitions(); Set<String> peers = new HashSet<>(); @@ -101,15 +127,70 @@ public class StatisticAggregator { if (repl != null) { peers.add(repl.getLeaseholder()); } else { - assert false; // !!! delete + //assert false; // !!! delete } } - GetEstimatedSizeWithLastModifiedTsRequest request = MSG_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() + if (peers.isEmpty()) { + return CompletableFuture.completedFuture(LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE)); + } + + GetEstimatedSizeWithLastModifiedTsRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest() .tableId(table.tableId()).build(); - for (String node : peers) { - messageService.send(node, request); - } +/* for (String p : peers) { + @Nullable InternalClusterNode cons = topologyService.getByConsistentId(p); + if (cons == null) + continue; + + CompletableFuture<NetworkMessage> fut = messagingService.invoke(cons, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS); + + fut.thenApply(res -> { + System.err.println(); + return null; + }).exceptionally(ex -> { + System.err.println(); + return null; + }); + + fut.join(); + }*/ + + CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] invokeFutures = peers.stream() + .map(topologyService::getByConsistentId) + .filter(Objects::nonNull) + .map(node -> messagingService + .invoke(node, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS) + .thenApply(response -> { + assert response instanceof GetEstimatedSizeWithLastModifiedTsResponse : response; + + GetEstimatedSizeWithLastModifiedTsResponse response0 = (GetEstimatedSizeWithLastModifiedTsResponse) response; + + return LongObjectImmutablePair.of(response0.estimatedSize(), response0.ts()); + }) + .exceptionally(unused -> LongObjectImmutablePair.of(0, HybridTimestamp.MIN_VALUE))) + .toArray(CompletableFuture[]::new); + + return allOf(invokeFutures).thenApply(unused -> { + HybridTimestamp last = HybridTimestamp.MIN_VALUE; + long count = 0L; + + for (CompletableFuture<?> fut : invokeFutures) { + CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> requestFut = + (CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>) fut; + LongObjectImmutablePair<HybridTimestamp> result = requestFut.join(); + + if (last == null) { + last = result.value(); + } else { + if (result.value().compareTo(last) > 0) { + last = result.value(); + } + } + count += result.keyLong(); + } + + return LongObjectImmutablePair.of(count, last); + }); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java index 0c4311a6f63..c18ccad923d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java @@ -18,9 +18,17 @@ package org.apache.ignite.internal.table.distributed; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.InternalClusterNode; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest; +import org.jetbrains.annotations.Nullable; /** * Keeps track of the number of modifications of a partition. @@ -38,13 +46,19 @@ public class PartitionModificationCounter { private final AtomicLong counter = new AtomicLong(0); private volatile long nextMilestone; private volatile HybridTimestamp lastMilestoneReachedTimestamp; + private final MessagingService messagingService; + private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY = + new PartitionReplicationMessagesFactory(); /** Constructor. */ public PartitionModificationCounter( HybridTimestamp initTimestamp, LongSupplier partitionSizeSupplier, double staleRowsFraction, - long minStaleRowsCount + long minStaleRowsCount, + int tableId, + int partitionId, + MessagingService messagingService ) { Objects.requireNonNull(initTimestamp, "initTimestamp"); Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier"); @@ -63,6 +77,65 @@ public class PartitionModificationCounter { nextMilestone = computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction, minStaleRowsCount); lastMilestoneReachedTimestamp = initTimestamp; + + this.messagingService = messagingService; + + messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, this::handleMessage); + } + + private void handleMessage(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) { + if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) { + handleRequestCounter((GetEstimatedSizeWithLastModifiedTsRequest) message, sender, correlationId); + } + } + + private void handleRequestCounter( + GetEstimatedSizeWithLastModifiedTsRequest message, + InternalClusterNode sender, + @Nullable Long correlationId + ) { + GetEstimatedSizeWithLastModifiedTsRequest msg = (GetEstimatedSizeWithLastModifiedTsRequest) message; + + CompletableFuture<Void> fut = messagingService.respond( + sender, + PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(100).ts(HybridTimestamp.MAX_VALUE).build(), + correlationId + ); + + System.err.println(); + +/* messagingService.respond( + sender, + PARTITION_REPLICATION_MESSAGES_FACTORY.hasDataResponse().build(), + correlationId + );*/ + + +/* TableViewInternal tableView = tableManager.cachedTable(msg.tableId()); + + if (tableView == null) { + LOG.debug("No table found to update statistics [id={}].", msg.tableId()); + + return; + } + + InternalTable table = tableView.internalTable(); + + for (int p = 0 ; p < table.partitions(); ++p) { + MvPartitionStorage mvPartition = table.storage().getMvPartition(p); + + if (mvPartition != null) { + LeaseInfo info = mvPartition.leaseInfo(); + + if (info != null) { + if (info.primaryReplicaNodeName().equals(nodeName)) { + mvPartition.estimatedSize(); + } + } + } + } + + storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, correlationId));*/ } /** Returns the current counter value. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java index 496a24077f6..ef506461751 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed; import java.util.function.LongSupplier; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.MessagingService; /** * Factory for producing {@link PartitionModificationCounter}. @@ -41,12 +42,20 @@ public class PartitionModificationCounterFactory { * @param partitionSizeSupplier Partition size supplier. * @return New partition modification counter. */ - public PartitionModificationCounter create(LongSupplier partitionSizeSupplier) { + public PartitionModificationCounter create( + int tableId, + int partitionId, + LongSupplier partitionSizeSupplier, + MessagingService messagingService + ) { return new PartitionModificationCounter( currentTimestampSupplier.get(), partitionSizeSupplier, DEFAULT_STALE_ROWS_FRACTION, - DEFAULT_MIN_STALE_ROWS_COUNT + DEFAULT_MIN_STALE_ROWS_COUNT, + tableId, + partitionId, + messagingService ); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 648c3c1354f..75df7c230b6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -472,6 +472,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final PartitionModificationCounterFactory partitionModificationCounterFactory; private final Map<TablePartitionId, PartitionModificationCounterMetricSource> partModCounterMetricSources = new ConcurrentHashMap<>(); + MessagingService messagingService0; // TODO + /** * Creates a new table manager. * @@ -581,6 +583,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { this.executorInclinedPlacementDriver = new ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor); this.reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService); + messagingService0 = messagingService; + TxMessageSender txMessageSender = new TxMessageSender( messagingService, replicaSvc, @@ -1042,7 +1046,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionDataStorage, table, safeTimeTracker, - replicationConfiguration + replicationConfiguration, + messagingService0 ); internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); @@ -1393,12 +1398,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionDataStorage, table, safeTimeTracker, - replicationConfiguration + replicationConfiguration, + messagingService0 ); - partitionUpdateHandlers.storageUpdateHandler.lastModificationCounterMilestone(); - //partitionDataStorage.getStorage().estimatedSize(); - internalTbl.updatePartitionTrackers(partId, safeTimeTracker, storageIndexTracker); mvGc.addStorage(replicaGrpId, partitionUpdateHandlers.gcUpdateHandler); @@ -3153,7 +3156,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { PartitionDataStorage partitionDataStorage, TableViewInternal table, PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker, - ReplicationConfiguration replicationConfiguration + ReplicationConfiguration replicationConfiguration, + MessagingService messagingService ) { TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); @@ -3162,7 +3166,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler); LongSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); - PartitionModificationCounter modificationCounter = partitionModificationCounterFactory.create(partSizeSupplier); + PartitionModificationCounter modificationCounter = partitionModificationCounterFactory + .create(table.tableId(), partitionId, partSizeSupplier, messagingService); registerPartitionModificationCounterMetrics(table.tableId(), partitionId, modificationCounter); StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java index 988ab12a642..decd56de798 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.network.MessagingService; import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.table.distributed.PartitionModificationCounter; import org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory; @@ -66,13 +67,18 @@ public class TableTestUtils { /** No-op partition modification counter. */ public static final PartitionModificationCounter NOOP_PARTITION_MODIFICATION_COUNTER = - new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, 0, 0); + new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 0, 0, 0, 0, 0, null); /** No-op partition modification counter factory. */ public static PartitionModificationCounterFactory NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY = new PartitionModificationCounterFactory(() -> HybridTimestamp.MIN_VALUE) { @Override - public PartitionModificationCounter create(LongSupplier partitionSizeSupplier) { + public PartitionModificationCounter create( + int tableId, + int partitionId, + LongSupplier partitionSizeSupplier, + MessagingService messagingService + ) { return NOOP_PARTITION_MODIFICATION_COUNTER; } };
