This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0759c8d58a3 IGNITE-26069 Sql. Add data staleness check to statistic 
manager (#6649)
0759c8d58a3 is described below

commit 0759c8d58a392cd8172d440ef8356babad280688
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Fri Nov 14 08:17:35 2025 +0300

    IGNITE-26069 Sql. Add data staleness check to statistic manager (#6649)
---
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +-
 .../internal/sql/engine/ItAggregatesTest.java      |   5 +-
 .../sql/engine/statistic/ItStatisticTest.java      | 218 +++++++++++++-----
 .../internal/sql/engine/SqlQueryProcessor.java     |   6 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     | 144 ++++++------
 ...Manager.java => PartitionModificationInfo.java} |  22 +-
 .../engine/statistic/SqlStatisticManagerImpl.java  | 216 ++++++++++--------
 .../statistic/SqlStatisticUpdateManager.java       |   6 +-
 ...datesNotifier.java => StatisticAggregator.java} |   8 +-
 .../engine/statistic/StatisticAggregatorImpl.java  | 198 +++++++++++++++++
 .../StatisticChangedEvent.java}                    |  12 +-
 .../StatisticEventParameters.java}                 |  17 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   8 +-
 .../sql/engine/framework/TestBuilders.java         |   8 +-
 .../sql/engine/planner/PlannerTimeoutTest.java     |   7 +-
 .../sql/engine/prepare/PrepareServiceImplTest.java |  31 ++-
 .../statistic/SqlStatisticManagerImplTest.java     | 246 +++++++++++++--------
 .../sql/metrics/PlanningCacheMetricsTest.java      |   7 +-
 modules/table/build.gradle                         |   1 +
 .../distributed/PartitionModificationCounter.java  |  15 +-
 .../PartitionModificationCounterFactory.java       |  71 +++++-
 .../internal/table/distributed/TableManager.java   |   7 +-
 ...GetEstimatedSizeWithLastModifiedTsRequest.java} |  13 +-
 ...etEstimatedSizeWithLastModifiedTsResponse.java} |  12 +-
 .../message/PartitionModificationInfoMessage.java} |  21 +-
 .../internal/table/message/TableMessageGroup.java} |  22 +-
 .../PartitionModificationCounterTest.java          |  12 +-
 .../ignite/internal/table/TableTestUtils.java      |   9 +-
 28 files changed, 977 insertions(+), 372 deletions(-)

diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0ac1010a265..13ee855702c 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -513,6 +513,8 @@ public class IgniteImpl implements Ignite {
 
     private final ClockServiceMetricSource clockServiceMetricSource;
 
+    private final PartitionModificationCounterFactory 
partitionModificationCounterFactory;
+
     /**
      * The Constructor.
      *
@@ -1115,8 +1117,7 @@ public class IgniteImpl implements Ignite {
                 metricManager
         );
 
-        PartitionModificationCounterFactory 
partitionModificationCounterFactory =
-                new PartitionModificationCounterFactory(clockService::current);
+        partitionModificationCounterFactory = new 
PartitionModificationCounterFactory(clockService::current, 
clusterSvc.messagingService());
 
         distributedTblMgr = new TableManager(
                 name,
@@ -1489,6 +1490,8 @@ public class IgniteImpl implements Ignite {
             metricManager.registerSource(clockServiceMetricSource);
             metricManager.enable(clockServiceMetricSource);
 
+            partitionModificationCounterFactory.start();
+
             // Start the components that are required to join the cluster.
             // TODO https://issues.apache.org/jira/browse/IGNITE-22570
             CompletableFuture<Void> componentsStartFuture = 
lifecycleManager.startComponentsAsync(
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index c5f9d9f11f4..a939c769ca2 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -542,7 +542,10 @@ public class ItAggregatesTest extends 
BaseSqlIntegrationTest {
     public void testGroupingFunction(String[] rules) {
         sql("DELETE FROM test_str_int_real_dec");
 
-        sql("DELETE FROM test_str_int_real_dec");
+        Assumptions.assumeTrue(
+                Arrays.stream(rules).noneMatch(rule -> 
rule.contains("Colocated")),
+                "https://issues.apache.org/jira/browse/IGNITE-26512";
+        );
 
         sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(1, 's1', 10)");
         sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES 
(2, 's1', 20)");
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
index 081e1abebee..275bbb8457c 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
@@ -17,47 +17,129 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_STALE_ROWS_FRACTION;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY;
+import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_REFRESH_PERIOD;
+import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.INITIAL_DELAY;
+import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.REFRESH_PERIOD;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.nodeRowCount;
 import static org.hamcrest.Matchers.is;
 
-import java.util.concurrent.ExecutionException;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.util.HashCalculator;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 /** Integration test to check SQL statistics. */
 public class ItStatisticTest extends BaseSqlIntegrationTest {
     private SqlStatisticManagerImpl sqlStatisticManager;
-
-    private static final AtomicInteger counter = new AtomicInteger(0);
+    private static final int PARTITIONS = 3;
 
     @BeforeAll
     void beforeAll() {
         sqlStatisticManager = (SqlStatisticManagerImpl) 
queryProcessor().sqlStatisticManager();
-        sql("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER)");
+
+        sql(format("CREATE ZONE zone_with_repl (replicas 2, partitions {}) 
storage profiles ['"
+                + DEFAULT_STORAGE_PROFILE + "']", PARTITIONS));
+        sql("CREATE TABLE t(ID BIGINT PRIMARY KEY, VAL INTEGER) ZONE 
zone_with_repl");
     }
 
     @AfterAll
     void afterAll() {
-        sql("DROP TABLE IF EXISTS t");
+        sql("DROP TABLE IF EXISTS t;");
+    }
+
+    @AfterEach
+    void tearDown() {
+        sql("DELETE FROM t;");
+    }
+
+    /** Simple case demonstrating that the tables size is being updated during 
statistic refresh interval. */
+    @Test
+    public void testTableSizeUpdates() {
+        long milestone1 = computeNextMilestone(0, DEFAULT_STALE_ROWS_FRACTION, 
DEFAULT_MIN_STALE_ROWS_COUNT);
+
+        String selectQuery = "select * from t";
+
+        long update = insert(0, milestone1);
+
+        sql(selectQuery);
+
+        AtomicInteger inc = new AtomicInteger();
+
+        long timeout = calcStatisticUpdateTimeout();
+        // max 10 times cache pollution
+        long pollInterval = timeout / 10;
+
+        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() ->
+                        assertQuery(format("select {} from t", 
inc.incrementAndGet()))
+                                .matches(nodeRowCount("TableScan", is((int) 
update)))
+                                .check()
+        );
+    }
+
+    @Test
+    public void testTableSizeUpdatesForcibly() {
+        long milestone = computeNextMilestone(0, DEFAULT_STALE_ROWS_FRACTION, 
DEFAULT_MIN_STALE_ROWS_COUNT);
+
+        long updates1 = insert(0L, milestone);
+
+        long timeout = calcStatisticUpdateTimeout();
+
+        // max 10 times cache pollution
+        long pollInterval = timeout / 10;
+
+        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+                    sqlStatisticManager.forceUpdateAll();
+                    sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+                    assertQuery("select 1 from t")
+                            .matches(nodeRowCount("TableScan", is((int) 
updates1)))
+                            .check();
+                }
+        );
+
+        milestone = computeNextMilestone(milestone, 
DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+        long updates2 = insert(updates1, milestone);
+
+        sqlStatisticManager.forceUpdateAll();
+        sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+        // query not cached in plans
+        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+                    sqlStatisticManager.forceUpdateAll();
+                    sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+                    assertQuery("select 1 from t")
+                            .matches(nodeRowCount("TableScan", is((int) 
updates2)))
+                            .check();
+                }
+        );
     }
 
     @Test
     public void statisticUpdatesChangeQueryPlans() throws Exception {
         try {
-            
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
-
             sqlScript(""
-                    + "CREATE TABLE j1(ID INTEGER PRIMARY KEY, VAL INTEGER);"
-                    + "CREATE TABLE j2(ID INTEGER PRIMARY KEY, VAL INTEGER);"
+                    + "CREATE TABLE j1(ID INTEGER PRIMARY KEY, VAL INTEGER) 
ZONE zone_with_repl;"
+                    + "CREATE TABLE j2(ID INTEGER PRIMARY KEY, VAL INTEGER) 
ZONE zone_with_repl;"
             );
+
             sql("INSERT INTO j1 SELECT x, x FROM system_range(?, ?)", 0, 10);
 
             sqlStatisticManager.forceUpdateAll();
@@ -66,22 +148,31 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
             String query = "SELECT /*+ DISABLE_RULE('HashJoinConverter', 
'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ "
                     + "j1.* FROM j2, j1 WHERE j2.id = j1.id";
 
-            assertQuery(query)
-                    // expecting right source has less rows than left
-                    
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
-                    .returnNothing()
-                    .check();
+            long statRefresh = calcStatisticUpdateTimeout();
 
-            sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 100);
+            // max 10 times cache pollution
+            long pollInterval = statRefresh / 10;
+
+            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                            assertQuery(query)
+                                    // expecting right source has less rows 
than left
+                                    
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
+                                    .returnNothing()
+                                    .check()
+            );
+
+            sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 3 * 
DEFAULT_MIN_STALE_ROWS_COUNT);
 
             sqlStatisticManager.forceUpdateAll();
             sqlStatisticManager.lastUpdateStatisticFuture().get(5, 
TimeUnit.SECONDS);
 
-            Awaitility.await().timeout(Math.max(10_000, 2 * 
PLAN_UPDATER_INITIAL_DELAY), TimeUnit.MILLISECONDS).untilAsserted(() ->
-                    assertQuery(query)
-                            // expecting right source has less rows than left
-                            
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
-                            .check()
+            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                            assertQuery(query)
+                                    // expecting right source has less rows 
than left
+                                    
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
+                                    .check()
             );
         } finally {
             sqlScript(""
@@ -90,46 +181,59 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
         }
     }
 
-    @Test
-    public void testStatisticsRowCount() throws Exception {
-        // For test we should always update statistics.
-        long prevValueOfThreshold = 
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
-        try {
-            insertAndUpdateRunQuery(500);
-            assertQuery(getUniqueQuery())
-                    .matches(nodeRowCount("TableScan", is(500)))
-                    .check();
-
-            insertAndUpdateRunQuery(600);
-            assertQuery(getUniqueQuery())
-                    .matches(nodeRowCount("TableScan", is(1100)))
-                    .check();
-
-            
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
-            insertAndUpdateRunQuery(900);
-
-            // Statistics shouldn't be updated despite we inserted new rows.
-            assertQuery(getUniqueQuery())
-                    .matches(nodeRowCount("TableScan", is(1100)))
-                    .check();
-        } finally {
-            
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(prevValueOfThreshold);
-        }
+    private static long calcStatisticUpdateTimeout() {
+        long inc = TimeUnit.SECONDS.toMillis(2);
+        // need to wait at least 2 statistic updates.
+        long statisticAggregationTimeout = INITIAL_DELAY + 2 * REFRESH_PERIOD;
+
+        return PLAN_UPDATER_INITIAL_DELAY > statisticAggregationTimeout
+                ? PLAN_UPDATER_INITIAL_DELAY + PLAN_UPDATER_REFRESH_PERIOD
+                // need to wait at least one planner cache re-calculation step 
with a bit timeout for it.
+                : statisticAggregationTimeout + PLAN_UPDATER_REFRESH_PERIOD + 
inc;
     }
 
-    private void insertAndUpdateRunQuery(int numberOfRecords) throws 
ExecutionException, TimeoutException, InterruptedException {
-        int start = counter.get();
-        int end = counter.addAndGet(numberOfRecords) - 1;
-        sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", start, end);
+    // copy-paste from private method: 
PartitionModificationCounter#computeNextMilestone
+    // if implementation will changes, it need to be changed too
+    private static long computeNextMilestone(
+            long currentSize,
+            double staleRowsFraction,
+            long minStaleRowsCount
+    ) {
+        return Math.max((long) (currentSize * staleRowsFraction), 
minStaleRowsCount);
+    }
 
-        // run unique sql to update statistics
-        sql(getUniqueQuery());
+    /**
+     * Calculates number of rows need to be inserted with guarantee that 
{@code insertsPerPartition} will be reached for every partition.
+     * Inclusively 'from', exclusively 'to' bounds.
+     */
+    private static long insert(long from, long insertsPerPartition) {
+        long numberOfInsertions = 0;
+
+        long[] partitionUpdates = new long[PARTITIONS];
+
+        HashCalculator calc = new HashCalculator();
+
+        for (long i = from; i < Integer.MAX_VALUE; ++i) {
+            calc.appendLong(i);
+            int partition = IgniteUtils.safeAbs(calc.hash()) % PARTITIONS;
+            partitionUpdates[partition] += 1;
+            calc.reset();
+            numberOfInsertions = i;
+            boolean filled = true;
+            for (int pos = 0; pos < PARTITIONS; ++pos) {
+                if (partitionUpdates[pos] < insertsPerPartition) {
+                    filled = false;
+                    break;
+                }
+            }
+
+            if (filled) {
+                break;
+            }
+        }
 
-        // wait to update statistics
-        sqlStatisticManager.lastUpdateStatisticFuture().get(5, 
TimeUnit.SECONDS);
-    }
+        sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", from, 
numberOfInsertions);
 
-    private static String getUniqueQuery() {
-        return "SELECT " + counter.incrementAndGet() + " FROM t";
+        return numberOfInsertions + 1;
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b97a597af86..ddfd4e5f9dd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -98,6 +98,7 @@ import 
org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
 import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl;
 import 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticUpdateManager;
+import org.apache.ignite.internal.sql.engine.statistic.StatisticAggregatorImpl;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -253,7 +254,10 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
         this.killCommandHandler = killCommandHandler;
         this.eventLog = eventLog;
 
-        sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, 
catalogManager, lowWaterMark);
+        StatisticAggregatorImpl statAggregator =
+                new StatisticAggregatorImpl(() -> 
logicalTopologyService.localLogicalTopology().nodes(),
+                        clusterSrvc.messagingService());
+        sqlStatisticManager = new SqlStatisticManagerImpl(tableManager, 
catalogManager, lowWaterMark, commonScheduler, statAggregator);
         sqlSchemaManager = new SqlSchemaManagerImpl(
                 catalogManager,
                 sqlStatisticManager,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 83942549bc3..aecd0463137 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.sql.engine.prepare;
 import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
 import static 
org.apache.ignite.internal.sql.engine.prepare.CacheKey.EMPTY_CLASS_ARRAY;
 import static 
org.apache.ignite.internal.sql.engine.prepare.PlannerHelper.optimize;
+import static 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent.STATISTIC_CHANGED;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.fastQueryOptimizationEnabled;
 import static org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
@@ -64,6 +66,7 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.event.EventProducer;
 import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -95,7 +98,8 @@ import 
org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplain;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplainMode;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlKill;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
-import 
org.apache.ignite.internal.sql.engine.statistic.StatisticUpdatesNotifier;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.util.Cloner;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -138,7 +142,8 @@ public class PrepareServiceImpl implements PrepareService {
 
     private static final String PLANNING_EXECUTOR_SOURCE_NAME = 
THREAD_POOLS_METRICS_SOURCE_NAME + "sql-planning-executor";
 
-    public static final int PLAN_UPDATER_INITIAL_DELAY = 2_000;
+    public static final int PLAN_UPDATER_INITIAL_DELAY = 5_000;
+    public static final int PLAN_UPDATER_REFRESH_PERIOD = 5_000;
 
     private final UUID prepareServiceId = UUID.randomUUID();
     private final AtomicLong planIdGen = new AtomicLong();
@@ -165,6 +170,8 @@ public class PrepareServiceImpl implements PrepareService {
 
     private final LongSupplier currentClock;
 
+    private final EventProducer<StatisticChangedEvent, 
StatisticEventParameters> statUpdates;
+
     /**
      * Factory method.
      *
@@ -178,7 +185,7 @@ public class PrepareServiceImpl implements PrepareService {
      * @param ddlSqlToCommandConverter Converter from SQL DDL operators to 
catalog commands.
      * @param currentClock Actual clock supplier.
      * @param scheduler Scheduler.
-     * @param updNotifier Updates notifier call-back.
+     * @param statUpdates Statistic updates notifier.
      */
     public static PrepareServiceImpl create(
             String nodeName,
@@ -191,9 +198,9 @@ public class PrepareServiceImpl implements PrepareService {
             DdlSqlToCommandConverter ddlSqlToCommandConverter,
             LongSupplier currentClock,
             ScheduledExecutorService scheduler,
-            StatisticUpdatesNotifier updNotifier
+            EventProducer<StatisticChangedEvent, StatisticEventParameters> 
statUpdates
     ) {
-        PrepareServiceImpl impl = new PrepareServiceImpl(
+        return new PrepareServiceImpl(
                 nodeName,
                 clusterCfg.planner().estimatedNumberOfQueries().value(),
                 cacheFactory,
@@ -204,12 +211,9 @@ public class PrepareServiceImpl implements PrepareService {
                 metricManager,
                 schemaManager,
                 currentClock,
-                scheduler
+                scheduler,
+                statUpdates
         );
-
-        updNotifier.changesNotifier(impl::statisticsChanged);
-
-        return impl;
     }
 
     /**
@@ -224,6 +228,7 @@ public class PrepareServiceImpl implements PrepareService {
      * @param schemaManager Schema manager to use on validation phase to bind 
identifiers in AST with particular schema objects.
      * @param currentClock Actual clock supplier.
      * @param scheduler Scheduler.
+     * @param statUpdates Statistic updates notifier.
      */
     public PrepareServiceImpl(
             String nodeName,
@@ -236,7 +241,8 @@ public class PrepareServiceImpl implements PrepareService {
             MetricManager metricManager,
             SqlSchemaManager schemaManager,
             LongSupplier currentClock,
-            ScheduledExecutorService scheduler
+            ScheduledExecutorService scheduler,
+            EventProducer<StatisticChangedEvent, StatisticEventParameters> 
statUpdates
     ) {
         this.nodeName = nodeName;
         this.ddlConverter = ddlConverter;
@@ -244,6 +250,7 @@ public class PrepareServiceImpl implements PrepareService {
         this.metricManager = metricManager;
         this.plannerThreadCount = plannerThreadCount;
         this.schemaManager = schemaManager;
+        this.statUpdates = statUpdates;
 
         this.currentClock = currentClock;
 
@@ -276,6 +283,12 @@ public class PrepareServiceImpl implements PrepareService {
 
         IgnitePlanner.warmup();
 
+        statUpdates.listen(STATISTIC_CHANGED, parameters -> {
+            statisticsChanged(parameters.tableId());
+
+            return falseCompletedFuture();
+        });
+
         planUpdater.start();
     }
 
@@ -1065,8 +1078,6 @@ public class PrepareServiceImpl implements PrepareService 
{
 
         private final AtomicBoolean inProgress = new AtomicBoolean();
 
-        private volatile boolean recalculatePlans;
-
         private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
 
         private final long plannerTimeout;
@@ -1077,6 +1088,8 @@ public class PrepareServiceImpl implements PrepareService 
{
 
         private final BiFunction<Integer, String, SchemaPlus> 
defaultSchemaFunc;
 
+        private final Set<Integer> statPerTableChanges = new IntOpenHashSet();
+
         PlanUpdater(
                 ScheduledExecutorService planUpdater,
                 Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
@@ -1099,35 +1112,12 @@ public class PrepareServiceImpl implements 
PrepareService {
          * @param tableId Table Id statistic changed for.
          */
         void statisticsChanged(int tableId) {
-            Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries = 
cache.entrySet();
-
-            int currentCatalogVersion = catalogVersionSupplier.getAsInt();
-
-            boolean statChanged = false;
-
-            for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cachedEntries) {
-                CacheKey key = ent.getKey();
-                CompletableFuture<PlanInfo> fut = ent.getValue();
-
-                if (currentCatalogVersion == key.catalogVersion() && 
isCompletedSuccessfully(fut)) {
-                    // no wait, already completed
-                    PlanInfo info = fut.join();
-
-                    if (info.sources.contains(tableId)) {
-                        info.invalidate();
-                        statChanged = true;
-                    }
-                }
-            }
-
-            if (statChanged) {
-                recalculatePlans = true;
-            }
+            statPerTableChanges.add(tableId);
         }
 
         void start() {
             planUpdater.scheduleAtFixedRate(() -> {
-                if (!recalculatePlans) {
+                if (statPerTableChanges.isEmpty()) {
                     return;
                 }
 
@@ -1135,54 +1125,72 @@ public class PrepareServiceImpl implements 
PrepareService {
                     return;
                 }
 
-                CompletableFuture<Void> rePlanningFut = nullCompletedFuture();
-
-                while (recalculatePlans) {
-                    recalculatePlans = false;
-
-                    int currentCatalogVersion = 
catalogVersionSupplier.getAsInt();
+                for (int tableId : statPerTableChanges) {
+                    Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> 
cachedEntries = cache.entrySet();
 
-                    for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cache.entrySet()) {
+                    for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent 
: cachedEntries) {
                         CacheKey key = ent.getKey();
-                        CompletableFuture<PlanInfo> fut = cache.get(key);
+                        CompletableFuture<PlanInfo> fut = ent.getValue();
+                        int currentCatalogVersion = 
catalogVersionSupplier.getAsInt();
 
-                        // can be evicted
-                        if (fut != null && isCompletedSuccessfully(fut)) {
+                        if (currentCatalogVersion == key.catalogVersion() && 
isCompletedSuccessfully(fut)) {
+                            // no wait, already completed
                             PlanInfo info = fut.join();
 
-                            if (!info.needInvalidate()) {
-                                continue;
+                            if (info.sources.contains(tableId)) {
+                                info.invalidate();
                             }
+                        }
+                    }
 
-                            assert info.statement != null;
+                    // all involved entries are processed
+                    statPerTableChanges.remove(tableId);
+                }
 
-                            if (currentCatalogVersion == key.catalogVersion()) 
{
-                                SqlQueryType queryType = 
info.statement.parsedResult().queryType();
+                CompletableFuture<Void> rePlanningFut = nullCompletedFuture();
 
-                                SchemaPlus defaultSchema = 
defaultSchemaFunc.apply(key.catalogVersion(), key.schemaName());
+                int currentCatalogVersion = catalogVersionSupplier.getAsInt();
 
-                                PlanningContext planningContext = 
PlanningContext.builder()
-                                        
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-                                                
.defaultSchema(defaultSchema).build())
-                                        
.query(info.statement.parsedResult().originalQuery())
-                                        .plannerTimeout(plannerTimeout)
-                                        .catalogVersion(key.catalogVersion())
-                                        .defaultSchemaName(key.schemaName())
-                                        
.parameters(Commons.arrayToMap(key.paramTypes()))
-                                        .build();
+                for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent : 
cache.entrySet()) {
+                    CacheKey key = ent.getKey();
+                    CompletableFuture<PlanInfo> fut = cache.get(key);
 
-                                CompletableFuture<Void> newPlanFut =
-                                        prepare.recalculatePlan(queryType, 
info.statement.parsedResult, planningContext, key);
+                    // can be evicted
+                    if (fut != null && isCompletedSuccessfully(fut)) {
+                        PlanInfo info = fut.join();
 
-                                rePlanningFut.thenCompose(v -> newPlanFut);
-                            }
+                        if (!info.needInvalidate()) {
+                            continue;
+                        }
+
+                        assert info.statement != null;
+
+                        if (currentCatalogVersion == key.catalogVersion()) {
+                            SqlQueryType queryType = 
info.statement.parsedResult().queryType();
+
+                            SchemaPlus defaultSchema = 
defaultSchemaFunc.apply(key.catalogVersion(), key.schemaName());
+
+                            PlanningContext planningContext = 
PlanningContext.builder()
+                                    
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+                                            
.defaultSchema(defaultSchema).build())
+                                    
.query(info.statement.parsedResult().originalQuery())
+                                    .plannerTimeout(plannerTimeout)
+                                    .catalogVersion(key.catalogVersion())
+                                    .defaultSchemaName(key.schemaName())
+                                    
.parameters(Commons.arrayToMap(key.paramTypes()))
+                                    .build();
+
+                            CompletableFuture<Void> newPlanFut =
+                                    prepare.recalculatePlan(queryType, 
info.statement.parsedResult, planningContext, key);
+
+                            rePlanningFut.thenCompose(v -> newPlanFut);
                         }
                     }
                 }
 
                 rePlanningFut.whenComplete((k, err) -> inProgress.set(false));
 
-            }, PLAN_UPDATER_INITIAL_DELAY, 1_000, TimeUnit.MILLISECONDS);
+            }, PLAN_UPDATER_INITIAL_DELAY, PLAN_UPDATER_REFRESH_PERIOD, 
TimeUnit.MILLISECONDS);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
similarity index 57%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
index 997cbe7d606..f39104d5b8a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
@@ -17,6 +17,24 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+/** Partition modification information holder. */
+public class PartitionModificationInfo {
+    private final long estimatedSize;
+    private final long lastModificationCounter;
+
+    /** Constructor. */
+    public PartitionModificationInfo(long estimatedSize, long 
lastModificationCounter) {
+        this.estimatedSize = estimatedSize;
+        this.lastModificationCounter = lastModificationCounter;
+    }
+
+    /** Returns last modification representation. */
+    long lastModificationCounter() {
+        return lastModificationCounter;
+    }
+
+    /** Returns estimated size. */
+    long getEstimatedSize() {
+        return estimatedSize;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 7ac42780636..ac6a8ad3644 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -18,14 +18,21 @@
 package org.apache.ignite.internal.sql.engine.statistic;
 
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent.STATISTIC_CHANGED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.catalog.CatalogService;
@@ -33,25 +40,29 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
 import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.util.FastTimestamps;
 import org.jetbrains.annotations.TestOnly;
 
 /**
  * Statistic manager. Provide and manage update of statistics for SQL.
  */
-public class SqlStatisticManagerImpl implements SqlStatisticUpdateManager {
+public class SqlStatisticManagerImpl extends 
AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
+        implements SqlStatisticUpdateManager {
     private static final IgniteLogger LOG = 
Loggers.forClass(SqlStatisticManagerImpl.class);
     static final long DEFAULT_TABLE_SIZE = 1L;
-    private static final ActualSize DEFAULT_VALUE = new 
ActualSize(DEFAULT_TABLE_SIZE, 0L);
+    private static final ActualSize DEFAULT_VALUE = new 
ActualSize(DEFAULT_TABLE_SIZE, Long.MIN_VALUE);
 
     private final EventListener<ChangeLowWatermarkEventParameters> lwmListener 
= fromConsumer(this::onLwmChanged);
     private final EventListener<DropTableEventParameters> 
dropTableEventListener = fromConsumer(this::onTableDrop);
@@ -66,29 +77,35 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
     private final CatalogService catalogService;
     private final LowWatermark lowWatermark;
 
-    private final AtomicReference<StatisticUpdatesSupplier> changesSupplier = 
new AtomicReference<>();
-
     /* Contains all known table id's with statistics. */
-    private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new 
ConcurrentHashMap<>();
+    final ConcurrentMap<Integer, ActualSize> tableSizeMap = new 
ConcurrentHashMap<>();
+
+    /* Contain dropped tables, can`t update statistic for such a case. */
+    Set<Integer> droppedTables = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+    private final ScheduledExecutorService scheduler;
+    private final StatisticAggregator<Collection<InternalTable>, 
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> statSupplier;
 
-    private volatile long thresholdTimeToPostponeUpdateMs = 
TimeUnit.MINUTES.toMillis(1);
+    static final long INITIAL_DELAY = 15_000;
+    static final long REFRESH_PERIOD = 15_000;
 
     /** Constructor. */
-    public SqlStatisticManagerImpl(TableManager tableManager, CatalogService 
catalogService, LowWatermark lowWatermark) {
+    public SqlStatisticManagerImpl(
+            TableManager tableManager,
+            CatalogService catalogService,
+            LowWatermark lowWatermark,
+            ScheduledExecutorService scheduler,
+            StatisticAggregator<Collection<InternalTable>, 
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> statSupplier
+    ) {
         this.tableManager = tableManager;
         this.catalogService = catalogService;
         this.lowWatermark = lowWatermark;
-    }
-
-    @Override
-    public void changesNotifier(StatisticUpdatesSupplier updater) {
-        if (!this.changesSupplier.compareAndSet(null, updater)) {
-            throw new AssertionError("Statistics updater unexpected change");
-        }
+        this.scheduler = scheduler;
+        this.statSupplier = statSupplier;
     }
 
     /**
-     * Returns approximate number of rows in table by their id.
+     * Returns approximate number of rows in table.
      *
      * <p>Returns the previous known value or {@value 
SqlStatisticManagerImpl#DEFAULT_TABLE_SIZE} as default value. Can start process 
to
      * update asked statistics in background to have updated values for future 
requests.
@@ -97,65 +114,9 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
      */
     @Override
     public long tableSize(int tableId) {
-        updateTableSizeStatistics(tableId, false);
-
         return tableSizeMap.getOrDefault(tableId, DEFAULT_VALUE).getSize();
     }
 
-    /** Update table size statistic in the background if it required. */
-    private void updateTableSizeStatistics(int tableId, boolean force) {
-        TableViewInternal tableView = tableManager.cachedTable(tableId);
-        if (tableView == null) {
-            LOG.debug("There is no table to update statistics [id={}].", 
tableId);
-            return;
-        }
-
-        ActualSize tableSize = tableSizeMap.get(tableId);
-        if (tableSize == null) {
-            // has been concurrently cleaned up, no need more update statistic 
for the table.
-            return;
-        }
-
-        long currTimestamp = FastTimestamps.coarseCurrentTimeMillis();
-        long lastUpdateTime = tableSize.getTimestamp();
-
-        if (force || lastUpdateTime <= currTimestamp - 
thresholdTimeToPostponeUpdateMs) {
-            // Prevent to run update for the same table twice concurrently.
-            if (!force && !tableSizeMap.replace(tableId, tableSize, new 
ActualSize(tableSize.getSize(), currTimestamp - 1))) {
-                return;
-            }
-
-            // just request new table size in background.
-            CompletableFuture<Void> updateResult = 
tableView.internalTable().estimatedSize()
-                    .thenAccept(size -> {
-                        // the table can be concurrently dropped and we 
shouldn't put new value in this case.
-                        tableSizeMap.computeIfPresent(tableId, (k, v) -> {
-                            // Discard current computation if value in cache 
is newer than current one.
-                            if (v.timestamp >= currTimestamp) {
-                                return v;
-                            }
-
-                            return new ActualSize(Math.max(size, 1), 
currTimestamp);
-                        });
-                    }).handle((res, err) -> {
-                        if (err != null) {
-                            LOG.warn(format("Can't calculate size for table 
[id={}].", tableId), err);
-
-                            return null;
-                        } else {
-                            StatisticUpdatesSupplier supplier = 
changesSupplier.get();
-                            if (supplier != null) {
-                                supplier.accept(tableId);
-                            }
-
-                            return res;
-                        }
-                    });
-
-            latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult : 
prev.thenCompose(none -> updateResult));
-        }
-    }
-
     @Override
     public void start() {
         catalogService.listen(CatalogEvent.TABLE_CREATE, 
createTableEventListener);
@@ -171,6 +132,62 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
                 tableSizeMap.putIfAbsent(table.id(), DEFAULT_VALUE);
             }
         }
+
+        scheduler.scheduleAtFixedRate(this::update, INITIAL_DELAY, 
REFRESH_PERIOD, TimeUnit.MILLISECONDS);
+    }
+
+    private void update() {
+        if (!latestUpdateFut.get().isDone()) {
+            return;
+        }
+
+        Collection<InternalTable> tables = new 
ArrayList<>(tableSizeMap.size());
+
+        for (Map.Entry<Integer, ActualSize> ent : tableSizeMap.entrySet()) {
+            Integer tableId = ent.getKey();
+
+            if (droppedTables.contains(tableId)) {
+                continue;
+            }
+
+            TableViewInternal tableView = tableManager.cachedTable(tableId);
+
+            if (tableView == null) {
+                LOG.debug("No table found to update statistics [id={}].", 
ent.getKey());
+            } else {
+                tables.add(tableView.internalTable());
+            }
+        }
+
+        CompletableFuture<Void> updateResult = 
statSupplier.estimatedSizeWithLastUpdate(tables)
+                .handle((infos, err) -> {
+                    for (Int2ObjectMap.Entry<PartitionModificationInfo> ent : 
infos.int2ObjectEntrySet()) {
+                        int tableId = ent.getIntKey();
+                        PartitionModificationInfo info = ent.getValue();
+
+                        if (err != null) {
+                            LOG.debug("Failed to update table statistics for 
[tableId={}].", err, tableId);
+                        } else {
+                            ActualSize updatedSize = new 
ActualSize(Math.max(info.getEstimatedSize(), DEFAULT_TABLE_SIZE),
+                                    info.lastModificationCounter());
+                            ActualSize currentSize = tableSizeMap.get(tableId);
+                            // the table can be concurrently dropped and we 
shouldn't put new value in this case.
+                            tableSizeMap.compute(tableId, (k, v) -> {
+                                return v != null && v.modificationCounter() > 
info.lastModificationCounter()
+                                        ? v
+                                        : updatedSize; // Save initial or 
replace stale state.
+                            });
+
+                            if (updatedSize.modificationCounter() >= 
currentSize.modificationCounter()) {
+                                fireEvent(STATISTIC_CHANGED, new 
StatisticEventParameters(tableId));
+                            }
+                        }
+                    }
+
+                    return null;
+                });
+
+        latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult : 
prev.thenCompose(none -> updateResult));
     }
 
     @Override
@@ -185,6 +202,7 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
         int catalogVersion = parameters.catalogVersion();
 
         destructionEventsQueue.enqueue(new DestroyTableEvent(catalogVersion, 
tableId));
+        droppedTables.add(tableId);
     }
 
     private void onTableCreate(CreateTableEventParameters parameters) {
@@ -196,25 +214,46 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
         List<DestroyTableEvent> events = 
destructionEventsQueue.drainUpTo(earliestVersion);
 
         events.forEach(event -> tableSizeMap.remove(event.tableId()));
+        events.forEach(event -> droppedTables.remove(event.tableId()));
     }
 
-    /** Timestamped size. */
-    private static class ActualSize {
-        long timestamp;
+    /** Size with modification counter. */
+    static class ActualSize {
+        long modificationCounter;
         long size;
 
-        ActualSize(long size, long timestamp) {
-            this.timestamp = timestamp;
+        ActualSize(long size, long modificationCounter) {
+            this.modificationCounter = modificationCounter;
             this.size = size;
         }
 
-        long getTimestamp() {
-            return timestamp;
+        public long modificationCounter() {
+            return modificationCounter;
         }
 
         long getSize() {
             return size;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ActualSize that = ((ActualSize) o);
+
+            return modificationCounter == that.modificationCounter && size == 
that.size;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(modificationCounter, size);
+        }
     }
 
     /** Internal event. */
@@ -236,17 +275,6 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
         }
     }
 
-    /**
-     * Set threshold time to postpone update statistics.
-     */
-    @TestOnly
-    public long setThresholdTimeToPostponeUpdateMs(long milliseconds) {
-        assert milliseconds >= 0;
-        long prevValue = thresholdTimeToPostponeUpdateMs;
-        thresholdTimeToPostponeUpdateMs = milliseconds;
-        return prevValue;
-    }
-
     /**
      * Returns feature for the last run update statistics to have ability wait 
update statistics.
      */
@@ -258,10 +286,6 @@ public class SqlStatisticManagerImpl implements 
SqlStatisticUpdateManager {
     /** Forcibly updates statistics for all known tables, ignoring throttling. 
*/
     @TestOnly
     public void forceUpdateAll() {
-        List<Integer> tableIds = List.copyOf(tableSizeMap.keySet());
-
-        for (int tableId : tableIds) {
-            updateTableSizeStatistics(tableId, true);
-        }
+        update();
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
index 997cbe7d606..a4ed27af630 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
+import org.apache.ignite.internal.event.EventProducer;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
+
 /** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
EventProducer<StatisticChangedEvent, StatisticEventParameters> {
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
similarity index 84%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
index c5459c939ad..5a72dab336c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
-/** Statistic updates notifier. */
+/** Statistic aggregator. */
 @FunctionalInterface
-public interface StatisticUpdatesNotifier {
-    /** Changes callback. */
-    void changesNotifier(StatisticUpdatesSupplier updater);
+public interface StatisticAggregator<T, R> {
+    /** Estimated size and last value update. */
+    R estimatedSizeWithLastUpdate(T t);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
new file mode 100644
index 00000000000..3bcfb3faecf
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.statistic;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.InternalTable;
+import 
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsRequest;
+import 
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsResponse;
+import 
org.apache.ignite.internal.table.message.PartitionModificationInfoMessage;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
+import org.jetbrains.annotations.Nullable;
+
+/** Statistic aggregator. */
+public class StatisticAggregatorImpl implements
+        StatisticAggregator<Collection<InternalTable>, 
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(StatisticAggregatorImpl.class);
+    private final Supplier<Set<LogicalNode>> clusterNodes;
+    private final MessagingService messagingService;
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
+    private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(5);
+    private final AtomicReference<@Nullable Map<TablePartitionId, 
CompletableFuture<Object>>> requestsCompletion = new AtomicReference<>();
+
+    /** Constructor. */
+    public StatisticAggregatorImpl(
+            Supplier<Set<LogicalNode>> clusterNodes,
+            MessagingService messagingService
+    ) {
+        this.clusterNodes = clusterNodes;
+        this.messagingService = messagingService;
+
+        messagingService.addMessageHandler(TableMessageGroup.class, 
this::handleMessage);
+    }
+
+    private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
+        Map<TablePartitionId, CompletableFuture<Object>> completedRequests = 
requestsCompletion.get();
+
+        if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse && 
completedRequests != null) {
+            GetEstimatedSizeWithLastModifiedTsResponse response = 
(GetEstimatedSizeWithLastModifiedTsResponse) message;
+            for (PartitionModificationInfoMessage ent : 
response.modifications()) {
+                TablePartitionId id = new TablePartitionId(ent.tableId(), 
ent.partId());
+                long estSize = ent.estimatedSize();
+                long modificationCounter = ent.lastModificationCounter();
+
+                CompletableFuture<Object> responseFut = 
completedRequests.get(id);
+
+                // stale response
+                if (responseFut == null) {
+                    continue;
+                }
+
+                synchronized (this) {
+                    if (isCompletedSuccessfully(responseFut)) {
+                        PartitionModificationInfo res = 
(PartitionModificationInfo) responseFut.join();
+                        if (modificationCounter > 
res.lastModificationCounter()) {
+                            responseFut.complete(completedFuture(new 
PartitionModificationInfo(estSize, modificationCounter)));
+                        }
+                    } else {
+                        responseFut.complete(new 
PartitionModificationInfo(estSize, modificationCounter));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns future with map<<em>last modification timestamp</em>, 
<em>estimated size</em>> for input tables.
+     */
+    @Override
+    public CompletableFuture<Int2ObjectMap<PartitionModificationInfo>> 
estimatedSizeWithLastUpdate(Collection<InternalTable> tables) {
+        // some requests are in progress
+        if (requestsCompletion.get() != null) {
+            return completedFuture(Int2ObjectMaps.emptyMap());
+        }
+
+        Collection<Integer> tablesId = 
tables.stream().map(InternalTable::tableId).collect(Collectors.toList());
+
+        GetEstimatedSizeWithLastModifiedTsRequest request =
+                
TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest().tables(tablesId).build();
+
+        HashMap<TablePartitionId, CompletableFuture<Object>> partIdRequests = 
new HashMap<>();
+        requestsCompletion.set(partIdRequests);
+
+        for (InternalTable t : tables) {
+            for (int p = 0; p < t.partitions(); ++p) {
+                partIdRequests.put(new TablePartitionId(t.tableId(), p),
+                        new 
CompletableFuture<>().orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS));
+            }
+        }
+
+        List<CompletableFuture<Void>> reqFutures = new ArrayList<>();
+
+        for (LogicalNode node : clusterNodes.get()) {
+            CompletableFuture<Void> reqFut = messagingService.send(node, 
request);
+
+            reqFutures.add(reqFut.orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS));
+        }
+
+        CompletableFuture<CompletableFuture<Void>>[] requests = 
reqFutures.toArray(CompletableFuture[]::new);
+
+        CompletableFuture<Void> allRequests = allOf(requests);
+
+        Int2ObjectMap<PartitionModificationInfo> summary = new 
Int2ObjectOpenHashMap<>();
+
+        for (InternalTable t : tables) {
+            Map<TablePartitionId, CompletableFuture<Object>> tableResponses = 
new HashMap<>();
+            for (Map.Entry<TablePartitionId, CompletableFuture<Object>> ent : 
partIdRequests.entrySet()) {
+                if (ent.getKey().tableId() == t.tableId()) {
+                    tableResponses.put(ent.getKey(), ent.getValue());
+                }
+            }
+
+            allRequests = allRequests
+                    .thenCompose(r -> 
allOf(tableResponses.values().toArray(CompletableFuture[]::new)))
+                    .handle((ret, ex) -> {
+                        if (ex != null) {
+                            LOG.debug("Can`t update statistics for table 
[id={}].", ex, t.tableId());
+                        }
+
+                        CompletableFuture<Void> allResponses = 
allOf(tableResponses.values().toArray(CompletableFuture[]::new));
+
+                        if (!isCompletedSuccessfully(allResponses)) {
+                            if (LOG.isDebugEnabled()) {
+                                for (Map.Entry<TablePartitionId, 
CompletableFuture<Object>> ent : tableResponses.entrySet()) {
+                                    if 
(isCompletedSuccessfully(ent.getValue())) {
+                                        LOG.debug("Can`t update statistics for 
table partition [id={}].", ent.getKey());
+                                    }
+                                }
+                            }
+                            return null;
+                        }
+
+                        for (Map.Entry<TablePartitionId, 
CompletableFuture<Object>> ent : tableResponses.entrySet()) {
+                            PartitionModificationInfo info = 
(PartitionModificationInfo) ent.getValue().join();
+                            long estSize = info.getEstimatedSize();
+                            long modificationCounter = 
info.lastModificationCounter();
+
+                            summary.compute(ent.getKey().tableId(), (k, v) -> 
v == null
+                                    ? new PartitionModificationInfo(estSize, 
modificationCounter)
+                                    : new 
PartitionModificationInfo(v.getEstimatedSize() + estSize,
+                                            
Math.max(v.lastModificationCounter(), modificationCounter)));
+                        }
+
+                        return null;
+                    });
+        }
+
+        return allRequests.handle((ret, ex) -> {
+            if (ex != null) {
+                LOG.debug("Exception during tables size estimation.", ex);
+                return Int2ObjectMaps.emptyMap();
+            }
+
+            requestsCompletion.set(null);
+
+            return summary;
+        });
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
similarity index 74%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
index 997cbe7d606..74cb6958ea8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
@@ -15,8 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.sql.engine.statistic.event;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.event.Event;
+
+/** Statistic changed event. */
+public enum StatisticChangedEvent implements Event {
+    /**
+     * Fired when statistic is changed.
+     */
+    STATISTIC_CHANGED,
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
similarity index 66%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
index 997cbe7d606..63013148d38 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
@@ -15,8 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.sql.engine.statistic.event;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.event.EventParameters;
+
+/** Event related parameters. */
+public class StatisticEventParameters implements EventParameters {
+    private final int tableId;
+
+    public StatisticEventParameters(int tableId) {
+        this.tableId = tableId;
+    }
+
+    public int tableId() {
+        return tableId;
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 4112616c425..4738c1407af 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.catalog.CatalogCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.hlc.ClockService;
@@ -140,6 +141,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -265,6 +268,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
+
         prepareService = new PrepareServiceImpl(
                 "test",
                 0,
@@ -276,7 +281,8 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
                 metricManager,
                 new PredefinedSchemaManager(schema),
                 clockService::currentLong,
-                commonExecutor
+                commonExecutor,
+                producer
         );
         parserService = new ParserServiceImpl();
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 67184e438e2..c3db2244f1f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -75,6 +75,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
@@ -129,6 +130,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
 import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -687,6 +690,8 @@ public class TestBuilders {
                     IgniteThreadFactory.create("test", 
"common-scheduled-executors", LOG)
             );
 
+            AbstractEventProducer<StatisticChangedEvent, 
StatisticEventParameters> producer = new AbstractEventProducer<>() {};
+
             var prepareService = new PrepareServiceImpl(
                     clusterName,
                     0,
@@ -698,7 +703,8 @@ public class TestBuilders {
                     new NoOpMetricManager(),
                     schemaManager,
                     clockService::currentLong,
-                    scheduledExecutor
+                    scheduledExecutor,
+                    producer
             );
 
             Map<String, List<String>> systemViewsByNode = new HashMap<>();
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
index 28ab93a627f..a330bc84301 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
@@ -38,6 +38,7 @@ import org.apache.calcite.plan.volcano.VolcanoPlanner;
 import org.apache.calcite.plan.volcano.VolcanoTimeoutException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
@@ -53,6 +54,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -69,6 +72,7 @@ public class PlannerTimeoutTest extends AbstractPlannerTest {
         long plannerTimeout = 1L;
         IgniteSchema schema = createSchema(createTestTable("T1"));
         SqlOperationContext ctx = operationContext();
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
 
         PrepareService prepareService = new PrepareServiceImpl(
                 "test",
@@ -81,7 +85,8 @@ public class PlannerTimeoutTest extends AbstractPlannerTest {
                 new MetricManagerImpl(),
                 new PredefinedSchemaManager(schema),
                 mock(LongSupplier.class),
-                mock(ScheduledExecutorService.class)
+                mock(ScheduledExecutorService.class),
+                producer
         );
         prepareService.start();
         try {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index c624b65e571..25af2adbdda 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.prepare;
 
 import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY;
+import static 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_REFRESH_PERIOD;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
@@ -45,7 +46,9 @@ import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -56,6 +59,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -66,12 +70,15 @@ import 
org.apache.ignite.internal.sql.engine.SqlOperationContext;
 import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.VersionedSchemaManager;
+import 
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PlanInfo;
 import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
@@ -524,7 +531,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1));
 
         AtomicInteger ver = new AtomicInteger();
-        PrepareServiceImpl service = 
createPlannerServiceWithInactivePlanUpdater(schema, 
CaffeineCacheFactory.INSTANCE, 10000,
+        PrepareServiceImpl service = createPlannerService(schema, 
CaffeineCacheFactory.INSTANCE, 10000,
                 Integer.MAX_VALUE, 1000, ver);
 
         String selectQuery = "SELECT * FROM test.t1 WHERE c1 = 1";
@@ -536,15 +543,23 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
 
         Awaitility.await()
-                .atMost(Duration.ofMillis(10000))
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
                 .until(
                         () -> service.cache.size() == 2
                 );
 
-        assertThat(service.cache.size(), is(2));
         service.statisticsChanged(table1.id());
 
-        assertThat(service.cache.entrySet().stream().filter(e -> 
e.getValue().join().needInvalidate()).count(), is(1L));
+        Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedSnap = new 
HashSet<>(service.cache.entrySet());
+
+        Awaitility.await()
+                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
+                .until(
+                        () -> !new 
HashSet<>(service.cache.entrySet()).equals(cachedSnap)
+                );
+
+        // cache futures snapshot highlight only one invalidation item.
+        assertThat(cachedSnap.stream().filter(e -> 
e.getValue().join().needInvalidate()).count(), is(1L));
     }
 
     @Test
@@ -848,9 +863,11 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
+
         PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
                 mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
-                new PredefinedSchemaManager(schemas), 
clockService::currentLong, commonExecutor);
+                new PredefinedSchemaManager(schemas), 
clockService::currentLong, commonExecutor, producer);
 
         createdServices.add(service);
 
@@ -883,9 +900,11 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
+
         PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
                 mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
-                new VersionedSchemaManager(schemas, ver), 
clockService::currentLong, executor);
+                new VersionedSchemaManager(schemas, ver), 
clockService::currentLong, executor, producer);
 
         createdServices.add(service);
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
index fc526b747e3..57d6eda18cb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.sql.engine.statistic;
 
+import static it.unimi.dsi.fastutil.ints.Int2ObjectMap.entry;
 import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.DEFAULT_TABLE_SIZE;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.INITIAL_DELAY;
+import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.REFRESH_PERIOD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
@@ -31,12 +31,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
@@ -53,8 +56,10 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.lang.IgniteCheckedException;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.sql.ColumnType;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
@@ -65,6 +70,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  * Tests of SqlStatisticManagerImpl.
  */
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest {
     @Mock
     private TableManager tableManager;
@@ -81,6 +87,12 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
     @Mock
     private LowWatermark lowWatermark;
 
+    @Mock
+    StatisticAggregatorImpl statAggregator;
+
+    @InjectExecutorService
+    private ScheduledExecutorService commonExecutor;
+
     private static final CatalogTableColumnDescriptor pkCol =
             new CatalogTableColumnDescriptor("pkCol", ColumnType.STRING, 
false, 0, 0, 10, null);
 
@@ -89,9 +101,9 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         int tableId = ThreadLocalRandom.current().nextInt();
         // Preparing:
         when(catalogManager.catalog(anyInt())).thenReturn(mock(Catalog.class));
-        when(tableManager.cachedTable(tableId)).thenReturn(null);
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
         // Test:
@@ -108,51 +120,24 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+                
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
-        // Test:
-        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
-        // The second time we should obtain the same value from a cache.
-        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
-
-        verify(internalTable, times(1)).estimatedSize();
-    }
+        long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
 
-    @Test
-    public void testEstimationFailure() {
-        int tableId = ThreadLocalRandom.current().nextInt();
-
-        prepareCatalogWithTable(tableId);
-
-        when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
-        when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        when(internalTable.estimatedSize()).thenReturn(
-                CompletableFuture.completedFuture(1L),
-                CompletableFuture.completedFuture(2L),
-                CompletableFuture.failedFuture(new 
IgniteCheckedException(INTERNAL_ERR, "Test exception"))
+        Awaitility.await().timeout(timeout, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
         );
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
-        StatisticUpdatesSupplier notifier = 
mock(StatisticUpdatesSupplier.class);
-        sqlStatisticManager.changesNotifier(notifier);
-        sqlStatisticManager.start();
-
-        sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
-
-        // table size 1
-        assertEquals(1L, sqlStatisticManager.tableSize(tableId));
-        verify(notifier, times(1)).accept(anyInt());
-
-        // table size 2
-        assertEquals(2L, sqlStatisticManager.tableSize(tableId));
+        // Second time we should obtain the same value from a cache.
+        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
 
-        // exceptionable table size
-        assertEquals(2L, sqlStatisticManager.tableSize(tableId));
-        verify(notifier, times(2)).accept(anyInt());
-        verify(internalTable, times(3)).estimatedSize();
+        verify(statAggregator, 
times(1)).estimatedSizeWithLastUpdate(List.of(internalTable));
     }
 
     @Test
@@ -160,35 +145,49 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         int tableId = ThreadLocalRandom.current().nextInt();
         long tableSize1 = 999_888_777L;
         long tableSize2 = 111_222_333L;
+
+        HybridTimestamp time1 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+        HybridTimestamp time2 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
         // Preparing:
         prepareCatalogWithTable(tableId);
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        when(internalTable.estimatedSize()).thenReturn(
-                CompletableFuture.completedFuture(tableSize1),
-                CompletableFuture.completedFuture(tableSize2));
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+                .thenReturn(
+                        
CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                                entry(tableId, new 
PartitionModificationInfo(tableSize1, time1.longValue())))),
+                        
CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                                entry(tableId, new 
PartitionModificationInfo(tableSize2, time2.longValue()))))
+                );
+
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
-        // Test:
-        assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
+        long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+        Awaitility.await().timeout(timeout, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                assertEquals(tableSize1, 
sqlStatisticManager.tableSize(tableId))
+        );
         // The second time we should obtain the same value from a cache.
         assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
 
-        // Allow to refresh value.
-        sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
+        // Forcibly update.
+        sqlStatisticManager.forceUpdateAll();
         // Now we need obtain a fresh value of table size.
         assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
+        assertEquals(time2.longValue(), 
sqlStatisticManager.tableSizeMap.get(tableId).modificationCounter());
 
-        verify(internalTable, times(2)).estimatedSize();
+        verify(statAggregator, 
times(2)).estimatedSizeWithLastUpdate(List.of(internalTable));
     }
 
     @Test
-    public void checkLoadAllTablesOnStart() {
+    public void checkLoadAllTablesOnStart() throws Exception {
         int minimumCatalogVersion = 1;
         int maximumCatalogVersion = 10;
+        long tableSize = 99999L;
 
         // Preparing:
         
when(catalogManager.earliestCatalogVersion()).thenReturn(minimumCatalogVersion);
@@ -223,16 +222,25 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
 
         when(tableManager.cachedTable(anyInt())).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(99999L));
+        Int2ObjectMap<PartitionModificationInfo> modifications = new 
Int2ObjectOpenHashMap<>();
+        for (int i = minimumCatalogVersion; i <= maximumCatalogVersion + 1; 
++i) {
+            modifications.put(i, new PartitionModificationInfo(tableSize, 
Long.MAX_VALUE));
+        }
+        when(statAggregator.estimatedSizeWithLastUpdate(any()))
+                .thenReturn(CompletableFuture.completedFuture(modifications));
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
+        sqlStatisticManager.forceUpdateAll();
+        sqlStatisticManager.lastUpdateStatisticFuture().get(5_000, 
TimeUnit.MILLISECONDS);
+
         // Test:
         // For known tables we got calculated table size.
         for (int i = minimumCatalogVersion; i <= maximumCatalogVersion; i++) {
-            assertEquals(99999L, sqlStatisticManager.tableSize(i));
-            assertEquals(99999L, sqlStatisticManager.tableSize(i + 1));
+            assertEquals(tableSize, sqlStatisticManager.tableSize(i));
+            assertEquals(tableSize, sqlStatisticManager.tableSize(i + 1));
         }
 
         // For unknown tables we got default size.
@@ -258,13 +266,19 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+                
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
-        // Test:
-        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+        long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+        Awaitility.await().timeout(timeout, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
+        );
 
         int catalogVersionBeforeDrop = 1;
         int catalogVersionAfterDrop = 2;
@@ -272,6 +286,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         // After table drop we still obtain the same value from a cache.
         tableDropCapture.getValue().notify(new DropTableEventParameters(-1, 
catalogVersionAfterDrop, tableId));
         assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+        assertEquals(1L, sqlStatisticManager.droppedTables.size());
 
         // After LWM has been changed data in case is removed and we get 
default value.
         
when(catalogManager.activeCatalogVersion(catalogVersionBeforeDrop)).thenReturn(catalogVersionAfterDrop);
@@ -279,6 +294,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                 .notify(new 
ChangeLowWatermarkEventParameters(HybridTimestamp.hybridTimestamp(catalogVersionBeforeDrop)));
 
         assertEquals(DEFAULT_TABLE_SIZE, 
sqlStatisticManager.tableSize(tableId));
+        // After LWM dropped tables are also cleared.
+        assertEquals(0L, sqlStatisticManager.droppedTables.size());
     }
 
     @SuppressWarnings("PMD.UseNotifyAllInsteadOfNotify")
@@ -295,9 +312,12 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+                
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
         sqlStatisticManager.start();
 
         // Test:
@@ -316,53 +336,97 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                 .build();
         tableCreateCapture.getValue().notify(new 
CreateTableEventParameters(-1, 0, catalogDescriptor));
         // now table is created and size should be actual.
-        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+        long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+        Awaitility.await().timeout(timeout, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
+        );
     }
 
     @Test
-    void ensureStaleRequestsAreDiscarded() throws InterruptedException {
+    void statisticUpdatesIfEstimationPartiallyUnavailable() {
         int tableId = ThreadLocalRandom.current().nextInt();
         long tableSize1 = 100_000L;
         long tableSize2 = 500_000L;
+
+        HybridTimestamp time1 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+        HybridTimestamp time2 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
         // Preparing:
         prepareCatalogWithTable(tableId);
 
-        CompletableFuture<Long> staleResultFuture = new CompletableFuture<>();
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable))).thenReturn(
+                // stale result goes first
+                CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize1, time1.longValue())))),
+                CompletableFuture.failedFuture(new RuntimeException("smth 
wrong")),
+                CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize2, time2.longValue()))))
+        );
+
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
-        when(internalTable.estimatedSize()).thenReturn(
-                staleResultFuture, // stale result goes first
-                CompletableFuture.completedFuture(tableSize2)
-        );
 
-        SqlStatisticManagerImpl sqlStatisticManager = new 
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
-        sqlStatisticManager.start();
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
 
-        // Test:
-        // first call results in non-completed future, therefore default size 
is expected 
-        assertEquals(DEFAULT_TABLE_SIZE, 
sqlStatisticManager.tableSize(tableId));
+        sqlStatisticManager.start();
+        // tableSize1
+        sqlStatisticManager.forceUpdateAll();
 
-        // Allow to refresh value.
-        sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(1);
+        assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
 
-        Future<?> oldUpdateFuture = 
sqlStatisticManager.lastUpdateStatisticFuture();
+        // err call
+        sqlStatisticManager.forceUpdateAll();
 
-        // Wait till statistics will be requested one more time.
-        assertTrue(waitForCondition(() -> {
-            sqlStatisticManager.tableSize(tableId);
-            Future<?> currentUpdateFuture = 
sqlStatisticManager.lastUpdateStatisticFuture();
+        assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
 
-            // reference comparison on purpose
-            return oldUpdateFuture != currentUpdateFuture;
-        }, 1_000));
+        // tableSize2
+        sqlStatisticManager.forceUpdateAll();
 
-        // Now we need obtain a new value of table size.
         assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
 
-        staleResultFuture.complete(tableSize1);
+        verify(statAggregator, 
times(3)).estimatedSizeWithLastUpdate(List.of(internalTable));
+    }
 
-        // Forbid refreshing the statistics.
-        sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
+    @Test
+    void ensureStaleRequestsAreDiscarded() {
+        int tableId = ThreadLocalRandom.current().nextInt();
+        long tableSize1 = 100_000L;
+        long tableSize2 = 500_000L;
+
+        HybridTimestamp time1 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+        HybridTimestamp time2 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
+        // Preparing:
+        prepareCatalogWithTable(tableId);
+
+        when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
+        when(tableViewInternal.internalTable()).thenReturn(internalTable);
+        
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable))).thenReturn(
+                // stale result goes first
+                CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize1, time1.longValue())))),
+                CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+                        entry(tableId, new 
PartitionModificationInfo(tableSize2, time2.longValue()))))
+        );
+
+        SqlStatisticManagerImpl sqlStatisticManager =
+                new SqlStatisticManagerImpl(tableManager, catalogManager, 
lowWatermark, commonExecutor, statAggregator);
+        sqlStatisticManager.start();
+
+        // Test:
+        // first call results in non-completed future, therefore default size 
is expected
+        assertEquals(DEFAULT_TABLE_SIZE, 
sqlStatisticManager.tableSize(tableId));
+
+        // get stale
+        sqlStatisticManager.forceUpdateAll();
+        // get tableSize2
+        sqlStatisticManager.forceUpdateAll();
+
+        long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+        Awaitility.await().timeout(timeout, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+                assertEquals(tableSize2, 
sqlStatisticManager.tableSize(tableId))
+        );
 
         // Stale result must be discarded.
         assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index 83a5d38cce6..10f436ef43f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metrics.MetricManager;
@@ -39,6 +40,8 @@ import 
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.sql.ParserService;
 import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
@@ -77,9 +80,11 @@ public class PlanningCacheMetricsTest extends 
AbstractPlannerTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
+
         PrepareService prepareService = new PrepareServiceImpl(
                 "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, 
metricManager, new PredefinedSchemaManager(schema),
-                clockService::currentLong, commonExecutor
+                clockService::currentLong, commonExecutor, producer
         );
 
         prepareService.start();
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 86622fde1c3..b7814536d3d 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -23,6 +23,7 @@ apply from: 
"$rootDir/buildscripts/java-integration-test.gradle"
 
 dependencies {
     annotationProcessor project(':ignite-configuration-annotation-processor')
+    annotationProcessor project(':ignite-network-annotation-processor')
     annotationProcessor libs.auto.service
 
     implementation project(':ignite-api')
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
index e9bb24446fc..59d26613f27 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -45,12 +45,9 @@ public class PartitionModificationCounter {
             SizeSupplier partitionSizeSupplier,
             StalenessConfigurationSupplier stalenessConfigurationSupplier
     ) {
-        Objects.requireNonNull(initTimestamp, "initTimestamp");
-        Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
-        Objects.requireNonNull(stalenessConfigurationSupplier, 
"configurationProvider");
-
-        this.partitionSizeSupplier = partitionSizeSupplier;
-        this.stalenessConfigurationSupplier = stalenessConfigurationSupplier;
+        lastMilestoneReachedTimestamp = Objects.requireNonNull(initTimestamp, 
"initTimestamp");
+        this.partitionSizeSupplier = 
Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
+        this.stalenessConfigurationSupplier = 
Objects.requireNonNull(stalenessConfigurationSupplier, "configurationProvider");
 
         TableStatsStalenessConfiguration tableStatsStalenessConfiguration = 
stalenessConfigurationSupplier.get();
 
@@ -59,7 +56,6 @@ public class PartitionModificationCounter {
                 tableStatsStalenessConfiguration.staleRowsFraction(),
                 tableStatsStalenessConfiguration.minStaleRowsCount()
         );
-        lastMilestoneReachedTimestamp = initTimestamp;
     }
 
     /** Returns the current counter value. */
@@ -67,6 +63,11 @@ public class PartitionModificationCounter {
         return counter.get();
     }
 
+    /** Returns partition estimated size. */
+    public long estimatedSize() {
+        return partitionSizeSupplier.get();
+    }
+
     /**
      * Returns a timestamp representing the commit time of the
      * last transaction that caused the counter to reach a milestone.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
index 9c4182c8a7f..1397ff0a878 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -17,18 +17,34 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsRequest;
+import 
org.apache.ignite.internal.table.message.PartitionModificationInfoMessage;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Factory for producing {@link PartitionModificationCounter}.
  */
 public class PartitionModificationCounterFactory {
-
     private final Supplier<HybridTimestamp> currentTimestampSupplier;
+    private final MessagingService messagingService;
+    private final Map<TablePartitionId, PartitionModificationCounter> 
partitionsInfo = new HashMap<>();
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new 
TableMessagesFactory();
 
-    public PartitionModificationCounterFactory(Supplier<HybridTimestamp> 
currentTimestampSupplier) {
+    public PartitionModificationCounterFactory(Supplier<HybridTimestamp> 
currentTimestampSupplier, MessagingService messagingService) {
         this.currentTimestampSupplier = currentTimestampSupplier;
+        this.messagingService = messagingService;
     }
 
     /**
@@ -36,17 +52,27 @@ public class PartitionModificationCounterFactory {
      *
      * @param partitionSizeSupplier Partition size supplier.
      * @param stalenessConfigurationSupplier Partition size supplier.
+     * @param tableId Table id.
+     * @param partitionId partition id.
      * @return New partition modification counter.
      */
     public PartitionModificationCounter create(
             SizeSupplier partitionSizeSupplier,
-            StalenessConfigurationSupplier stalenessConfigurationSupplier
+            StalenessConfigurationSupplier stalenessConfigurationSupplier,
+            int tableId,
+            int partitionId
     ) {
-        return new PartitionModificationCounter(
+        PartitionModificationCounter info = new PartitionModificationCounter(
                 currentTimestampSupplier.get(),
                 partitionSizeSupplier,
                 stalenessConfigurationSupplier
         );
+
+        synchronized (this) {
+            partitionsInfo.put(new TablePartitionId(tableId, partitionId), 
info);
+        }
+
+        return info;
     }
 
     /** An interface representing supplier of current size. */
@@ -60,4 +86,41 @@ public class PartitionModificationCounterFactory {
     public interface StalenessConfigurationSupplier {
         TableStatsStalenessConfiguration get();
     }
+
+    /**
+     * Starts routine.
+     */
+    public void start() {
+        messagingService.addMessageHandler(TableMessageGroup.class, 
this::handleMessage);
+    }
+
+    private void handleMessage(NetworkMessage message, InternalClusterNode 
sender, @Nullable Long correlationId) {
+        if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) {
+            handleRequestCounter(sender);
+        }
+    }
+
+    private void handleRequestCounter(InternalClusterNode sender) {
+        List<PartitionModificationInfoMessage> modificationInfo = new 
ArrayList<>();
+
+        synchronized (this) {
+            for (Map.Entry<TablePartitionId, PartitionModificationCounter> ent 
: partitionsInfo.entrySet()) {
+                PartitionModificationCounter info = ent.getValue();
+                TablePartitionId tblPartId = ent.getKey();
+                PartitionModificationInfoMessage infoMsg = 
TABLE_MESSAGES_FACTORY.partitionModificationInfoMessage()
+                        .tableId(tblPartId.tableId())
+                        .partId(tblPartId.partitionId())
+                        .estimatedSize(info.estimatedSize())
+                        
.lastModificationCounter(info.lastMilestoneTimestamp().longValue())
+                        .build();
+
+                modificationInfo.add(infoMsg);
+            }
+        }
+
+        messagingService.send(sender, TABLE_MESSAGES_FACTORY
+                .getEstimatedSizeWithLastModifiedTsResponse()
+                .modifications(modificationInfo)
+                .build());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f9832a8c6c2..e3fb64f6f06 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -3192,9 +3192,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         GcUpdateHandler gcUpdateHandler = new 
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
 
         SizeSupplier partSizeSupplier = () -> 
partitionDataStorage.getStorage().estimatedSize();
-        PartitionModificationCounter modificationCounter = 
partitionModificationCounterFactory.create(
-                partSizeSupplier, table::stalenessConfiguration
-        );
+
+        PartitionModificationCounter modificationCounter =
+                partitionModificationCounterFactory.create(partSizeSupplier, 
table::stalenessConfiguration, table.tableId(), partitionId);
+
         registerPartitionModificationCounterMetrics(table, partitionId, 
modificationCounter);
 
         StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
similarity index 59%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
index 997cbe7d606..b557c248f0a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
@@ -15,8 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** A message that queries a partition estimate size and last modification ts. 
*/
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST)
+public interface GetEstimatedSizeWithLastModifiedTsRequest extends 
NetworkMessage {
+    /** Table id`s to request estimated size for. */
+    Collection<Integer> tables();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
similarity index 61%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
index 997cbe7d606..f16f4e02d0b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
@@ -15,8 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** A response to the {@link GetEstimatedSizeWithLastModifiedTsRequest}. */
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE)
+public interface GetEstimatedSizeWithLastModifiedTsResponse extends 
NetworkMessage {
+    List<PartitionModificationInfoMessage> modifications();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
similarity index 57%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
index 997cbe7d606..8d066badee2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
@@ -15,8 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** Message for transferring a partition modification info. */
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE)
+public interface PartitionModificationInfoMessage extends NetworkMessage {
+    /** Table id. */
+    int tableId();
+
+    /** Partition id. */
+    int partId();
+
+    /** Estimated size. */
+    long estimatedSize();
+
+    /** Modification counter. */
+    long lastModificationCounter();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
similarity index 50%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
index 997cbe7d606..cf5c2f8dbf0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
@@ -15,8 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
 
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager, 
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.network.annotations.MessageGroup;
+
+/**
+ * Message group for table module.
+ */
+@MessageGroup(groupType = TableMessageGroup.GROUP_TYPE, groupName = 
"TableMessages")
+public interface TableMessageGroup {
+    /** Table message group type. */
+    short GROUP_TYPE = 17;
+
+    /** Message type for {@link GetEstimatedSizeWithLastModifiedTsRequest}. */
+    short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST = 1;
+
+    /** Message type for {@link GetEstimatedSizeWithLastModifiedTsResponse}. */
+    short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE = 2;
+
+    /** Message type for {@link PartitionModificationInfoMessage}. */
+    short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 3;
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
index e2968577009..9c48114a153 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.table.distributed;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
 
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.Test;
@@ -31,14 +33,14 @@ import org.junit.jupiter.api.Test;
  */
 public class PartitionModificationCounterTest extends BaseIgniteAbstractTest {
     private final PartitionModificationCounterFactory factory =
-            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.hybridTimestamp(1L));
+            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.hybridTimestamp(1L), mock(MessagingService.class));
 
     @Test
     void initialValues() {
         // Empty table.
         {
             PartitionModificationCounter counter = factory.create(
-                    () -> 0L, () -> new TableStatsStalenessConfiguration(0.5, 
200)
+                    () -> 0L, () -> new TableStatsStalenessConfiguration(0.5, 
200), 0, 0
             );
 
             assertThat(counter.value(), is(0L));
@@ -49,7 +51,7 @@ public class PartitionModificationCounterTest extends 
BaseIgniteAbstractTest {
         // Table with 10k rows.
         {
             PartitionModificationCounter counter = factory.create(
-                    () -> 10_000L, () -> new 
TableStatsStalenessConfiguration(0.2, 200)
+                    () -> 10_000L, () -> new 
TableStatsStalenessConfiguration(0.2, 200), 0, 0
             );
 
             assertThat(counter.value(), is(0L));
@@ -73,7 +75,7 @@ public class PartitionModificationCounterTest extends 
BaseIgniteAbstractTest {
                 () -> rowsCount,
                 () -> new TableStatsStalenessConfiguration(
                         CatalogUtils.DEFAULT_STALE_ROWS_FRACTION, 
CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT
-                )
+                ), 0, 0
         );
 
         assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
@@ -102,7 +104,7 @@ public class PartitionModificationCounterTest extends 
BaseIgniteAbstractTest {
     @SuppressWarnings({"ThrowableNotThrown", 
"ResultOfObjectAllocationIgnored", "DataFlowIssue"})
     void invalidUpdateValues() {
         PartitionModificationCounter counter = factory.create(
-                () -> 0L, () -> new TableStatsStalenessConfiguration(0.2, 500)
+                () -> 0L, () -> new TableStatsStalenessConfiguration(0.2, 
500), 0, 0
         );
 
         IgniteTestUtils.assertThrows(NullPointerException.class,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 2f2aa1bdd07..eef93d46a1b 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
 
 import java.util.List;
 import org.apache.ignite.internal.catalog.CatalogCommand;
@@ -43,6 +44,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.sql.SqlCommon;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
 import 
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
@@ -70,10 +72,13 @@ public class TableTestUtils {
 
     /** No-op partition modification counter factory. */
     public static PartitionModificationCounterFactory 
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
-            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.MIN_VALUE) {
+            new PartitionModificationCounterFactory(() -> 
HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) {
                 @Override
                 public PartitionModificationCounter create(
-                        SizeSupplier partitionSizeSupplier, 
StalenessConfigurationSupplier configurationSupplier
+                        SizeSupplier partitionSizeSupplier,
+                        StalenessConfigurationSupplier configurationSupplier,
+                        int tableId,
+                        int partitionId
                 ) {
                     return NOOP_PARTITION_MODIFICATION_COUNTER;
                 }

Reply via email to