This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch ignite-26069
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 41134b32b246cb015da2bf11bc94b5e197b24085
Author: zstan <[email protected]>
AuthorDate: Fri Oct 3 09:59:19 2025 +0300

    fix
---
 .../network/PartitionReplicationMessageGroup.java  |  12 +++
 .../replication/GetEstimatedSizeRequest.java       |   1 +
 ...GetEstimatedSizeWithLastModifiedTsRequest.java} |  12 +--
 ...etEstimatedSizeWithLastModifiedTsResponse.java} |  14 +--
 .../internal/sql/engine/SqlQueryProcessor.java     |   2 +-
 .../GetEstimatedSizeWithLastModifiedTsRequest.java |  10 --
 .../sql/engine/message/SqlQueryMessageGroup.java   |   2 -
 .../engine/statistic/SqlStatisticManagerImpl.java  |  20 +++-
 .../sql/engine/statistic/StatisticAggregator.java  | 113 ++++++++++++++++++---
 .../distributed/PartitionModificationCounter.java  |  75 +++++++++++++-
 .../PartitionModificationCounterFactory.java       |  13 ++-
 .../internal/table/distributed/TableManager.java   |  19 ++--
 .../ignite/internal/table/TableTestUtils.java      |  10 +-
 13 files changed, 247 insertions(+), 56 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 09c2f55e802..529df3f7c32 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -53,6 +53,8 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Binar
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
@@ -218,6 +220,16 @@ public interface PartitionReplicationMessageGroup {
      */
     short CHANGE_PEERS_AND_LEARNERS_ASYNC_REPLICA_REQUEST = 28;
 
+    /**
+     * Message type for {@link GetEstimatedSizeWithLastModifiedTsRequest}.
+     */
+    short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST = 29;
+
+    /**
+     * Message type for {@link GetEstimatedSizeWithLastModifiedTsResponse}.
+     */
+    short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE = 30;
+
     /**
      * Message types for partition replicator module RAFT commands.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
index de7de4b998e..72f36259955 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.network.replication;
 
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java
similarity index 78%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java
index de7de4b998e..a7f0602f95d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsRequest.java
@@ -17,14 +17,12 @@
 
 package org.apache.ignite.internal.partition.replicator.network.replication;
 
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TableAware;
 
-/**
- * Request for getting an estimated size of a partition.
- */
-@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE)
-public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest, 
TableAware {
+@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST)
+public interface GetEstimatedSizeWithLastModifiedTsRequest extends 
NetworkMessage {
+    /** ID of the table. */
+    int tableId();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java
similarity index 78%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java
index de7de4b998e..06bce1cc2a1 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeRequest.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/GetEstimatedSizeWithLastModifiedTsResponse.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.partition.replicator.network.replication;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
-import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
-import org.apache.ignite.internal.replicator.message.TableAware;
 
-/**
- * Request for getting an estimated size of a partition.
- */
-@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_MESSAGE)
-public interface GetEstimatedSizeRequest extends PrimaryReplicaRequest, 
TableAware {
+@Transferable(PartitionReplicationMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE)
+public interface GetEstimatedSizeWithLastModifiedTsResponse extends 
NetworkMessage {
+    HybridTimestamp ts();
+
+    long estimatedSize();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 5fce14f8488..936372db7aa 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -255,7 +255,7 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
         this.killCommandHandler = killCommandHandler;
         this.eventLog = eventLog;
 
-        statAggregator = new StatisticAggregator(placementDriver, 
clockService::current, tableManager);
+        statAggregator = new StatisticAggregator(placementDriver, 
clockService::current, tableManager, clusterSrvc.topologyService());
         sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, 
catalogManager, lowWaterMark, commonScheduler, statAggregator);
         sqlSchemaManager = new SqlSchemaManagerImpl(
                 catalogManager,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java
deleted file mode 100644
index a9ed8d493ce..00000000000
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/GetEstimatedSizeWithLastModifiedTsRequest.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.ignite.internal.sql.engine.message;
-
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
-
-@Transferable(SqlQueryMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE)
-public interface GetEstimatedSizeWithLastModifiedTsRequest extends 
NetworkMessage {
-    /** ID of the table. */
-    int tableId();
-}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index 8bbd77d0d7c..ea0d1f1509a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -46,6 +46,4 @@ public final class SqlQueryMessageGroup {
 
     /** See {@link CancelOperationResponse} for the details. */
     public static final short OPERATION_CANCEL_RESPONSE = 7;
-
-    public static final short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 8;
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 048655ad5c8..abd8f3e80f9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.statistic;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -106,6 +107,23 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
      */
     @Override
     public long tableSize(int tableId) {
+/*        TableViewInternal tableView = tableManager.cachedTable(tableId);
+
+        if (tableView == null) {
+            return 1;
+        }
+
+        try {
+            Thread.sleep(2000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> 
updateResult = statAggregator.estimatedSizeWithLastUpdate(
+                tableView.internalTable());
+
+        updateResult.join();*/
+
         return tableSizeMap.computeIfAbsent(tableId, k -> 
DEFAULT_VALUE).getSize();
     }
 
@@ -143,7 +161,7 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
                 continue;
             }
 
-            CompletableFuture<Void> updateResult = 
tableView.internalTable().estimatedSizeWithLastUpdate()
+            CompletableFuture<Void> updateResult = 
statAggregator.estimatedSizeWithLastUpdate(tableView.internalTable())
                     .thenAccept(res -> {
                         // the table can be concurrently dropped and we 
shouldn't put new value in this case.
                         tableSizeMap.computeIfPresent(tableId, (k, v) -> {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
index eca0f32cfd0..003560f7008 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
@@ -1,23 +1,34 @@
 package org.apache.ignite.internal.sql.engine.statistic;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toList;
+
 import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.message.DataPresence;
+import 
org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsResponse;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import 
org.apache.ignite.internal.sql.engine.message.GetEstimatedSizeWithLastModifiedTsRequest;
-import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.jetbrains.annotations.Nullable;
 
@@ -29,15 +40,21 @@ public class StatisticAggregator {
     private @Nullable String nodeName;
     private final TableManager tableManager;
     private static final SqlQueryMessagesFactory MSG_FACTORY = new 
SqlQueryMessagesFactory();
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+    private TopologyService topologyService;
+    private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(3);
 
     public StatisticAggregator(
             PlacementDriver placementDriver,
             Supplier<HybridTimestamp> currentClock,
-            TableManager tableManager
+            TableManager tableManager,
+            TopologyService topologyService
     ) {
         this.placementDriver = placementDriver;
         this.currentClock = currentClock;
         this.tableManager = tableManager;
+        this.topologyService = topologyService;
     }
 
     public void nodeName(String nodeName) {
@@ -47,7 +64,9 @@ public class StatisticAggregator {
     public void messaging(MessagingService messagingService) {
         this.messagingService = messagingService;
 
-        messagingService.addMessageHandler(SqlQueryMessageGroup.class, 
(message, sender, correlationId) -> {
+        
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
+
+/*        messagingService.addMessageHandler(SqlQueryMessageGroup.class, 
(message, sender, correlationId) -> {
             if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) {
                 GetEstimatedSizeWithLastModifiedTsRequest msg = 
(GetEstimatedSizeWithLastModifiedTsRequest) message;
 
@@ -77,11 +96,16 @@ public class StatisticAggregator {
 
                 storageAccessExecutor.execute(() -> handleHasDataRequest(msg, 
sender, correlationId));
             }
-        });
+        });*/
+
+        //InternalClusterNode localNode = 
clusterSrvc.topologyService().localMember();
+        //String nodeName = localNode.name();
     }
 
-    private void onMessage(InternalClusterNode node, 
GetEstimatedSizeWithLastModifiedTsRequest msg) {
-        assert node != null && msg != null;
+    private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
+        if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse) {
+            System.err.println("!!!!");
+        }
     }
 
     /**
@@ -89,7 +113,9 @@ public class StatisticAggregator {
      *
      * @return Estimated size of this table with last modification timestamp.
      */
-    public LongObjectImmutablePair<HybridTimestamp> 
estimatedSizeWithLastUpdate(InternalTable table) {
+    public CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> 
estimatedSizeWithLastUpdate(InternalTable table) {
+        assert messagingService != null;
+
         int partitions = table.partitions();
 
         Set<String> peers = new HashSet<>();
@@ -101,15 +127,70 @@ public class StatisticAggregator {
             if (repl != null) {
                 peers.add(repl.getLeaseholder());
             } else {
-                assert false; // !!! delete
+                //assert false; // !!! delete
             }
         }
 
-        GetEstimatedSizeWithLastModifiedTsRequest request = 
MSG_FACTORY.getEstimatedSizeWithLastModifiedTsRequest()
+        if (peers.isEmpty()) {
+            return 
CompletableFuture.completedFuture(LongObjectImmutablePair.of(0, 
HybridTimestamp.MIN_VALUE));
+        }
+
+        GetEstimatedSizeWithLastModifiedTsRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest()
                 .tableId(table.tableId()).build();
 
-        for (String node : peers) {
-            messageService.send(node, request);
-        }
+/*        for (String p : peers) {
+            @Nullable InternalClusterNode cons = 
topologyService.getByConsistentId(p);
+            if (cons == null)
+                continue;
+
+            CompletableFuture<NetworkMessage> fut = 
messagingService.invoke(cons, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS);
+
+            fut.thenApply(res -> {
+                System.err.println();
+                return null;
+            }).exceptionally(ex -> {
+                System.err.println();
+                return null;
+            });
+
+            fut.join();
+        }*/
+
+        CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] 
invokeFutures = peers.stream()
+                .map(topologyService::getByConsistentId)
+                .filter(Objects::nonNull)
+                .map(node -> messagingService
+                        .invoke(node, request, 
REQUEST_ESTIMATION_TIMEOUT_MILLIS)
+                        .thenApply(response -> {
+                            assert response instanceof 
GetEstimatedSizeWithLastModifiedTsResponse : response;
+
+                            GetEstimatedSizeWithLastModifiedTsResponse 
response0 = (GetEstimatedSizeWithLastModifiedTsResponse) response;
+
+                            return 
LongObjectImmutablePair.of(response0.estimatedSize(), response0.ts());
+                        })
+                        .exceptionally(unused -> LongObjectImmutablePair.of(0, 
HybridTimestamp.MIN_VALUE)))
+                .toArray(CompletableFuture[]::new);
+
+        return allOf(invokeFutures).thenApply(unused -> {
+            HybridTimestamp last = HybridTimestamp.MIN_VALUE;
+            long count = 0L;
+
+            for (CompletableFuture<?> fut : invokeFutures) {
+                CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> 
requestFut =
+                        
(CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>) fut;
+                LongObjectImmutablePair<HybridTimestamp> result = 
requestFut.join();
+
+                if (last == null) {
+                    last = result.value();
+                } else {
+                    if (result.value().compareTo(last) > 0) {
+                        last = result.value();
+                    }
+                }
+                count += result.keyLong();
+            }
+
+            return LongObjectImmutablePair.of(count, last);
+        });
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
index 0c4311a6f63..c18ccad923d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -18,9 +18,17 @@
 package org.apache.ignite.internal.table.distributed;
 
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeWithLastModifiedTsRequest;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Keeps track of the number of modifications of a partition.
@@ -38,13 +46,19 @@ public class PartitionModificationCounter {
     private final AtomicLong counter = new AtomicLong(0);
     private volatile long nextMilestone;
     private volatile HybridTimestamp lastMilestoneReachedTimestamp;
+    private final MessagingService messagingService;
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
 
     /** Constructor. */
     public PartitionModificationCounter(
             HybridTimestamp initTimestamp,
             LongSupplier partitionSizeSupplier,
             double staleRowsFraction,
-            long minStaleRowsCount
+            long minStaleRowsCount,
+            int tableId,
+            int partitionId,
+            MessagingService messagingService
     ) {
         Objects.requireNonNull(initTimestamp, "initTimestamp");
         Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
@@ -63,6 +77,65 @@ public class PartitionModificationCounter {
 
         nextMilestone = 
computeNextMilestone(partitionSizeSupplier.getAsLong(), staleRowsFraction, 
minStaleRowsCount);
         lastMilestoneReachedTimestamp = initTimestamp;
+
+        this.messagingService = messagingService;
+
+        
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
+    }
+
+    private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
+        if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) {
+            handleRequestCounter((GetEstimatedSizeWithLastModifiedTsRequest) 
message, sender, correlationId);
+        }
+    }
+
+    private void handleRequestCounter(
+            GetEstimatedSizeWithLastModifiedTsRequest message,
+            InternalClusterNode sender,
+            @Nullable Long correlationId
+    ) {
+        GetEstimatedSizeWithLastModifiedTsRequest msg = 
(GetEstimatedSizeWithLastModifiedTsRequest) message;
+
+        CompletableFuture<Void> fut = messagingService.respond(
+                sender,
+                
PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(100).ts(HybridTimestamp.MAX_VALUE).build(),
+                correlationId
+        );
+
+        System.err.println();
+
+/*        messagingService.respond(
+                sender,
+                
PARTITION_REPLICATION_MESSAGES_FACTORY.hasDataResponse().build(),
+                correlationId
+        );*/
+
+
+/*        TableViewInternal tableView = 
tableManager.cachedTable(msg.tableId());
+
+        if (tableView == null) {
+            LOG.debug("No table found to update statistics [id={}].", 
msg.tableId());
+
+            return;
+        }
+
+        InternalTable table = tableView.internalTable();
+
+        for (int p = 0 ; p < table.partitions(); ++p) {
+            MvPartitionStorage mvPartition = table.storage().getMvPartition(p);
+
+            if (mvPartition != null) {
+                LeaseInfo info = mvPartition.leaseInfo();
+
+                if (info != null) {
+                    if (info.primaryReplicaNodeName().equals(nodeName)) {
+                        mvPartition.estimatedSize();
+                    }
+                }
+            }
+        }
+
+        storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, 
correlationId));*/
     }
 
     /** Returns the current counter value. */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
index 496a24077f6..ef506461751 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
 
 /**
  * Factory for producing {@link PartitionModificationCounter}.
@@ -41,12 +42,20 @@ public class PartitionModificationCounterFactory {
      * @param partitionSizeSupplier Partition size supplier.
      * @return New partition modification counter.
      */
-    public PartitionModificationCounter create(LongSupplier 
partitionSizeSupplier) {
+    public PartitionModificationCounter create(
+            int tableId,
+            int partitionId,
+            LongSupplier partitionSizeSupplier,
+            MessagingService messagingService
+    ) {
         return new PartitionModificationCounter(
                 currentTimestampSupplier.get(),
                 partitionSizeSupplier,
                 DEFAULT_STALE_ROWS_FRACTION,
-                DEFAULT_MIN_STALE_ROWS_COUNT
+                DEFAULT_MIN_STALE_ROWS_COUNT,
+                tableId,
+                partitionId,
+                messagingService
         );
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 648c3c1354f..75df7c230b6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -472,6 +472,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
     private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
     private final Map<TablePartitionId, 
PartitionModificationCounterMetricSource> partModCounterMetricSources = new 
ConcurrentHashMap<>();
 
+    MessagingService messagingService0; // TODO
+
     /**
      * Creates a new table manager.
      *
@@ -581,6 +583,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
         this.reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
 
+        messagingService0 = messagingService;
+
         TxMessageSender txMessageSender = new TxMessageSender(
                 messagingService,
                 replicaSvc,
@@ -1042,7 +1046,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 partitionDataStorage,
                 table,
                 safeTimeTracker,
-                replicationConfiguration
+                replicationConfiguration,
+                messagingService0
         );
 
         internalTbl.updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
@@ -1393,12 +1398,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             partitionDataStorage,
                             table,
                             safeTimeTracker,
-                            replicationConfiguration
+                            replicationConfiguration,
+                            messagingService0
                     );
 
-                    
partitionUpdateHandlers.storageUpdateHandler.lastModificationCounterMilestone();
-                    //partitionDataStorage.getStorage().estimatedSize();
-
                     internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
 
                     mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
@@ -3153,7 +3156,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             PartitionDataStorage partitionDataStorage,
             TableViewInternal table,
             PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
-            ReplicationConfiguration replicationConfiguration
+            ReplicationConfiguration replicationConfiguration,
+            MessagingService messagingService
     ) {
         TableIndexStoragesSupplier indexes = 
table.indexStorageAdapters(partitionId);
 
@@ -3162,7 +3166,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         GcUpdateHandler gcUpdateHandler = new 
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
 
         LongSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
-        PartitionModificationCounter modificationCounter = 
partitionModificationCounterFactory.create(partSizeSupplier);
+        PartitionModificationCounter modificationCounter = 
partitionModificationCounterFactory
+                .create(table.tableId(), partitionId, partSizeSupplier, 
messagingService);
         registerPartitionModificationCounterMetrics(table.tableId(), 
partitionId, modificationCounter);
 
         StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 988ab12a642..decd56de798 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.sql.SqlCommon;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
@@ -66,13 +67,18 @@ public class TableTestUtils {
 
     /** No-op partition modification counter. */
     public static final PartitionModificationCounter 
NOOP_PARTITION_MODIFICATION_COUNTER =
-            new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, 0, 0);
+            new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, 0, 0, 0, 0, null);
 
     /** No-op partition modification counter factory. */
     public static PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
             new PartitionModificationCounterFactory(() -> 
HybridTimestamp.MIN_VALUE) {
                 @Override
-                public PartitionModificationCounter create(LongSupplier 
partitionSizeSupplier) {
+                public PartitionModificationCounter create(
+                        int tableId,
+                        int partitionId,
+                        LongSupplier partitionSizeSupplier,
+                        MessagingService messagingService
+                ) {
                     return NOOP_PARTITION_MODIFICATION_COUNTER;
                 }
             };


Reply via email to