This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch ignite-26069
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 51ab2d350b171920bd6af72497e38e69cf8b06a7
Author: zstan <[email protected]>
AuthorDate: Sat Oct 4 20:27:01 2025 +0300

    test ok
---
 .../engine/statistic/SqlStatisticManagerImpl.java  | 18 -------
 .../sql/engine/statistic/StatisticAggregator.java  | 61 ----------------------
 .../distributed/PartitionModificationCounter.java  | 59 +++++++--------------
 .../PartitionModificationCounterFactory.java       |  6 ++-
 .../internal/table/distributed/TableManager.java   | 21 ++++++--
 .../ignite/internal/table/TableTestUtils.java      |  5 +-
 6 files changed, 43 insertions(+), 127 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index abd8f3e80f9..a342485080a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.statistic;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
-import it.unimi.dsi.fastutil.longs.LongObjectImmutablePair;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -107,23 +106,6 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticManager {
      */
     @Override
     public long tableSize(int tableId) {
-/*        TableViewInternal tableView = tableManager.cachedTable(tableId);
-
-        if (tableView == null) {
-            return 1;
-        }
-
-        try {
-            Thread.sleep(2000);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-
-        CompletableFuture<LongObjectImmutablePair<HybridTimestamp>> 
updateResult = statAggregator.estimatedSizeWithLastUpdate(
-                tableView.internalTable());
-
-        updateResult.join();*/
-
         return tableSizeMap.computeIfAbsent(tableId, k -> 
DEFAULT_VALUE).getSize();
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
index 003560f7008..f20dc4c9d6e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
@@ -63,49 +63,6 @@ public class StatisticAggregator {
 
     public void messaging(MessagingService messagingService) {
         this.messagingService = messagingService;
-
-        
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
-
-/*        messagingService.addMessageHandler(SqlQueryMessageGroup.class, 
(message, sender, correlationId) -> {
-            if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) {
-                GetEstimatedSizeWithLastModifiedTsRequest msg = 
(GetEstimatedSizeWithLastModifiedTsRequest) message;
-
-                TableViewInternal tableView = 
tableManager.cachedTable(msg.tableId());
-
-                if (tableView == null) {
-                    LOG.debug("No table found to update statistics [id={}].", 
msg.tableId());
-
-                    return;
-                }
-
-                InternalTable table = tableView.internalTable();
-
-                for (int p = 0 ; p < table.partitions(); ++p) {
-                    MvPartitionStorage mvPartition = 
table.storage().getMvPartition(p);
-
-                    if (mvPartition != null) {
-                        LeaseInfo info = mvPartition.leaseInfo();
-
-                        if (info != null) {
-                            if 
(info.primaryReplicaNodeName().equals(nodeName)) {
-                                mvPartition.estimatedSize();
-                            }
-                        }
-                    }
-                }
-
-                storageAccessExecutor.execute(() -> handleHasDataRequest(msg, 
sender, correlationId));
-            }
-        });*/
-
-        //InternalClusterNode localNode = 
clusterSrvc.topologyService().localMember();
-        //String nodeName = localNode.name();
-    }
-
-    private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
-        if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse) {
-            System.err.println("!!!!");
-        }
     }
 
     /**
@@ -138,24 +95,6 @@ public class StatisticAggregator {
         GetEstimatedSizeWithLastModifiedTsRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest()
                 .tableId(table.tableId()).build();
 
-/*        for (String p : peers) {
-            @Nullable InternalClusterNode cons = 
topologyService.getByConsistentId(p);
-            if (cons == null)
-                continue;
-
-            CompletableFuture<NetworkMessage> fut = 
messagingService.invoke(cons, request, REQUEST_ESTIMATION_TIMEOUT_MILLIS);
-
-            fut.thenApply(res -> {
-                System.err.println();
-                return null;
-            }).exceptionally(ex -> {
-                System.err.println();
-                return null;
-            });
-
-            fut.join();
-        }*/
-
         CompletableFuture<LongObjectImmutablePair<HybridTimestamp>>[] 
invokeFutures = peers.stream()
                 .map(topologyService::getByConsistentId)
                 .filter(Objects::nonNull)
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
index c18ccad923d..9c6cd8994d1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -43,6 +43,10 @@ public class PartitionModificationCounter {
     private final double staleRowsFraction;
     private final long minStaleRowsCount;
 
+    int tableId;
+    int partitionId;
+    LongSupplier estimateSize;
+
     private final AtomicLong counter = new AtomicLong(0);
     private volatile long nextMilestone;
     private volatile HybridTimestamp lastMilestoneReachedTimestamp;
@@ -58,7 +62,8 @@ public class PartitionModificationCounter {
             long minStaleRowsCount,
             int tableId,
             int partitionId,
-            MessagingService messagingService
+            MessagingService messagingService,
+            LongSupplier estimateSize
     ) {
         Objects.requireNonNull(initTimestamp, "initTimestamp");
         Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
@@ -81,6 +86,10 @@ public class PartitionModificationCounter {
         this.messagingService = messagingService;
 
         
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
+
+        this.tableId = tableId;
+        this.partitionId = partitionId;
+        this.estimateSize = estimateSize;
     }
 
     private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
@@ -94,48 +103,18 @@ public class PartitionModificationCounter {
             InternalClusterNode sender,
             @Nullable Long correlationId
     ) {
-        GetEstimatedSizeWithLastModifiedTsRequest msg = 
(GetEstimatedSizeWithLastModifiedTsRequest) message;
-
-        CompletableFuture<Void> fut = messagingService.respond(
-                sender,
-                
PARTITION_REPLICATION_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(100).ts(HybridTimestamp.MAX_VALUE).build(),
-                correlationId
-        );
-
-        System.err.println();
-
-/*        messagingService.respond(
-                sender,
-                
PARTITION_REPLICATION_MESSAGES_FACTORY.hasDataResponse().build(),
-                correlationId
-        );*/
-
-
-/*        TableViewInternal tableView = 
tableManager.cachedTable(msg.tableId());
+        long estSize = estimateSize.getAsLong();
 
-        if (tableView == null) {
-            LOG.debug("No table found to update statistics [id={}].", 
msg.tableId());
+        if (tableId == message.tableId() && estSize != -1) {
+            System.err.println("!!!!! send size: " + estSize);
 
-            return;
+            messagingService.respond(
+                    sender,
+                    PARTITION_REPLICATION_MESSAGES_FACTORY
+                            
.getEstimatedSizeWithLastModifiedTsResponse().estimatedSize(estSize).ts(lastMilestoneTimestamp()).build(),
+                    correlationId
+            );
         }
-
-        InternalTable table = tableView.internalTable();
-
-        for (int p = 0 ; p < table.partitions(); ++p) {
-            MvPartitionStorage mvPartition = table.storage().getMvPartition(p);
-
-            if (mvPartition != null) {
-                LeaseInfo info = mvPartition.leaseInfo();
-
-                if (info != null) {
-                    if (info.primaryReplicaNodeName().equals(nodeName)) {
-                        mvPartition.estimatedSize();
-                    }
-                }
-            }
-        }
-
-        storageAccessExecutor.execute(() -> handleHasDataRequest(msg, sender, 
correlationId));*/
     }
 
     /** Returns the current counter value. */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
index ef506461751..e6abeadde4a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -46,7 +46,8 @@ public class PartitionModificationCounterFactory {
             int tableId,
             int partitionId,
             LongSupplier partitionSizeSupplier,
-            MessagingService messagingService
+            MessagingService messagingService,
+            LongSupplier estimateSize
     ) {
         return new PartitionModificationCounter(
                 currentTimestampSupplier.get(),
@@ -55,7 +56,8 @@ public class PartitionModificationCounterFactory {
                 DEFAULT_MIN_STALE_ROWS_COUNT,
                 tableId,
                 partitionId,
-                messagingService
+                messagingService,
+                estimateSize
         );
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 75df7c230b6..9887115d124 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1041,13 +1041,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 partitionStorages.getMvPartitionStorage()
         );
 
+        LongSupplier estSizeSupplier = () -> {
+            @Nullable MvPartitionStorage partition = 
table.internalTable().storage().getMvPartition(partId);
+            return partition == null ? -1 : partition.estimatedSize();
+        };
+
         PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
                 partId,
                 partitionDataStorage,
                 table,
                 safeTimeTracker,
                 replicationConfiguration,
-                messagingService0
+                messagingService0,
+                estSizeSupplier
         );
 
         internalTbl.updatePartitionTrackers(partId, safeTimeTracker, 
storageIndexTracker);
@@ -1393,13 +1399,19 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                     
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
 
+                    LongSupplier estSizeSupplier = () -> {
+                        @Nullable MvPartitionStorage partition = 
table.internalTable().storage().getMvPartition(partId);
+                        return partition == null ? -1 : 
partition.estimatedSize();
+                    };
+
                     PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
                             partId,
                             partitionDataStorage,
                             table,
                             safeTimeTracker,
                             replicationConfiguration,
-                            messagingService0
+                            messagingService0,
+                            estSizeSupplier
                     );
 
                     internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
@@ -3157,7 +3169,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             TableViewInternal table,
             PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeTracker,
             ReplicationConfiguration replicationConfiguration,
-            MessagingService messagingService
+            MessagingService messagingService,
+            LongSupplier estimateSize
     ) {
         TableIndexStoragesSupplier indexes = 
table.indexStorageAdapters(partitionId);
 
@@ -3167,7 +3180,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         LongSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
         PartitionModificationCounter modificationCounter = 
partitionModificationCounterFactory
-                .create(table.tableId(), partitionId, partSizeSupplier, 
messagingService);
+                .create(table.tableId(), partitionId, partSizeSupplier, 
messagingService, estimateSize);
         registerPartitionModificationCounterMetrics(table.tableId(), 
partitionId, modificationCounter);
 
         StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index decd56de798..15c89505bdf 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -67,7 +67,7 @@ public class TableTestUtils {
 
     /** No-op partition modification counter. */
     public static final PartitionModificationCounter 
NOOP_PARTITION_MODIFICATION_COUNTER =
-            new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, 0, 0, 0, 0, null);
+            new PartitionModificationCounter(HybridTimestamp.MIN_VALUE, () -> 
0, 0, 0, 0, 0, null, () -> 0L);
 
     /** No-op partition modification counter factory. */
     public static PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
@@ -77,7 +77,8 @@ public class TableTestUtils {
                         int tableId,
                         int partitionId,
                         LongSupplier partitionSizeSupplier,
-                        MessagingService messagingService
+                        MessagingService messagingService,
+                        LongSupplier estimateSize
                 ) {
                     return NOOP_PARTITION_MODIFICATION_COUNTER;
                 }

Reply via email to