This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fc07fd6216e IGNITE-27266 Sql. Improved potentially flaky tests 
(ItStatisticTest, SqlStatisticManagerImplTest) (#7206)
fc07fd6216e is described below

commit fc07fd6216e1cdd040ac9be2aa8de59585561685
Author: Max Zhuravkov <[email protected]>
AuthorDate: Mon Dec 15 19:03:02 2025 +0200

    IGNITE-27266 Sql. Improved potentially flaky tests (ItStatisticTest, 
SqlStatisticManagerImplTest) (#7206)
---
 .../sql/engine/statistic/ItStatisticTest.java      |  89 +++------
 .../sql/engine/prepare/PrepareServiceImpl.java     |  15 +-
 .../engine/statistic/SqlStatisticManagerImpl.java  |  10 +-
 .../sql/engine/prepare/PrepareServiceImplTest.java | 206 +++++++++++++--------
 .../statistic/SqlStatisticManagerImplTest.java     | 163 ++++++++--------
 .../sql/metrics/PlanningCacheMetricsTest.java      |  17 +-
 6 files changed, 267 insertions(+), 233 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
index a855dc731aa..7cc7bd1bdc6 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
@@ -46,6 +46,10 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
 
     private static final long REFRESH_PERIOD_MILLIS = 5_000;
 
+    private static final Duration REFRESH_CHECK_TIMEOUT = 
Duration.ofSeconds(20);
+
+    private static final Duration REFRESH_POLL_INTERVAL = 
Duration.ofMillis(50);
+
     @BeforeAll
     void beforeAll() {
         sqlStatisticManager = (SqlStatisticManagerImpl) 
queryProcessor().sqlStatisticManager();
@@ -80,12 +84,8 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
 
         AtomicInteger inc = new AtomicInteger();
 
-        long timeout = calcStatisticUpdateIntervalMillis();
-        // max 10 times cache pollution
-        long pollInterval = timeout / 10;
-
-        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() ->
+        Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
                         assertQuery(format("select {} from t", 
inc.incrementAndGet()))
                                 .matches(nodeRowCount("TableScan", is((int) 
update)))
                                 .check()
@@ -98,20 +98,15 @@ public class ItStatisticTest extends BaseSqlIntegrationTest 
{
 
         long updates1 = insert(0L, milestone);
 
-        long timeout = calcStatisticUpdateIntervalMillis();
-
-        // max 10 times cache pollution
-        long pollInterval = timeout / 10;
-
-        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-                    sqlStatisticManager.forceUpdateAll();
-                    sqlStatisticManager.lastUpdateStatisticFuture().join();
+        Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() -> {
+                            sqlStatisticManager.forceUpdateAll();
+                            
sqlStatisticManager.lastUpdateStatisticFuture().join();
 
-                    assertQuery("select 1 from t")
-                            .matches(nodeRowCount("TableScan", is((int) 
updates1)))
-                            .check();
-                }
+                            assertQuery("select 1 from t")
+                                    .matches(nodeRowCount("TableScan", 
is((int) updates1)))
+                                    .check();
+                        }
         );
 
         milestone = computeNextMilestone(milestone, 
DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT);
@@ -122,15 +117,12 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
         sqlStatisticManager.lastUpdateStatisticFuture().join();
 
         // query not cached in plans
-        Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
-                    sqlStatisticManager.forceUpdateAll();
-                    sqlStatisticManager.lastUpdateStatisticFuture().join();
-
-                    assertQuery("select 1 from t")
-                            .matches(nodeRowCount("TableScan", is((int) 
updates2)))
-                            .check();
-                }
+        Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() -> {
+                            assertQuery("select 1 from t")
+                                    .matches(nodeRowCount("TableScan", 
is((int) updates2)))
+                                    .check();
+                        }
         );
     }
 
@@ -150,13 +142,9 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
             String query = "SELECT /*+ DISABLE_RULE('HashJoinConverter', 
'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ "
                     + "j1.* FROM j2, j1 WHERE j2.id = j1.id";
 
-            long statRefresh = calcStatisticUpdateIntervalMillis();
-
-            // max 10 times cache pollution
-            long pollInterval = statRefresh / 10;
-
-            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+            Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                    .timeout(REFRESH_CHECK_TIMEOUT)
+                    .untilAsserted(() ->
                             assertQuery(query)
                                     // expecting right source has less rows 
than left
                                     
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
@@ -169,8 +157,8 @@ public class ItStatisticTest extends BaseSqlIntegrationTest 
{
             sqlStatisticManager.forceUpdateAll();
             sqlStatisticManager.lastUpdateStatisticFuture().get(5, 
TimeUnit.SECONDS);
 
-            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+            Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                    .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
                             assertQuery(query)
                                     // expecting right source has less rows 
than left
                                     
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
@@ -204,25 +192,21 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
             
sqlClusterConfig.statistics().autoRefresh().staleRowsCheckIntervalSeconds()
                     .update((int) (newRefreshInterval / 1000)).join();
 
-            long statRefresh = 
calcStatisticUpdateIntervalMillis(newRefreshInterval);
-            long pollInterval = statRefresh / 10;
-
-            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+            Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                    .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
                             assertQuery(query)
                                     // expecting right source has less rows 
than left
                                     
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
                                     .returnNothing()
-                                    .check()
-            );
+                                    .check());
 
             sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 3 * 
DEFAULT_MIN_STALE_ROWS_COUNT);
 
             sqlStatisticManager.forceUpdateAll();
             sqlStatisticManager.lastUpdateStatisticFuture().get(5, 
TimeUnit.SECONDS);
 
-            Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
-                    .timeout(statRefresh, 
TimeUnit.MILLISECONDS).untilAsserted(() ->
+            Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+                    .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
                             assertQuery(query)
                                     // expecting right source has less rows 
than left
                                     
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
@@ -235,19 +219,6 @@ public class ItStatisticTest extends 
BaseSqlIntegrationTest {
         }
     }
 
-    private static long calcStatisticUpdateIntervalMillis() {
-        return calcStatisticUpdateIntervalMillis(REFRESH_PERIOD_MILLIS);
-    }
-
-    private static long calcStatisticUpdateIntervalMillis(long 
refreshPeriodMillis) {
-        long inc = TimeUnit.SECONDS.toMillis(2);
-        // need to wait at least 2 statistic updates.
-        long statisticAggregationTimeout = refreshPeriodMillis + 2 * 
refreshPeriodMillis;
-        long delay = refreshPeriodMillis / 2;
-
-        return statisticAggregationTimeout + delay + inc;
-    }
-
     // copy-paste from private method: 
PartitionModificationCounter#computeNextMilestone
     // if implementation will changes, it need to be changed too
     private static long computeNextMilestone(
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index b337481e4f3..1beaffb7784 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -1139,8 +1139,9 @@ public class PrepareServiceImpl implements PrepareService 
{
             staleRowsCheckIntervalSeconds.listen(configListener);
 
             int intervalSeconds = staleRowsCheckIntervalSeconds.value();
+            long interval = calculateInterval(intervalSeconds);
 
-            schedule(intervalSeconds);
+            schedule(interval);
         }
 
         void stop() {
@@ -1155,16 +1156,20 @@ public class PrepareServiceImpl implements 
PrepareService {
             assert seconds != null;
 
             if (!Objects.equals(seconds, value.oldValue())) {
-                // To observe actual values of statistics, plan cache updates 
should happen more frequently
-                // (plan update interval must be less than statistics auto 
refresh interval).
-                int interval = Math.max(1, seconds / 2);
+                long interval = calculateInterval(seconds);
                 schedule(interval);
             }
 
             return nullCompletedFuture();
         }
 
-        private void schedule(int intervalSeconds) {
+        private static long calculateInterval(long value) {
+            // To observe actual values of statistics, plan cache updates 
should happen more frequently
+            // (plan update interval must be less than statistics auto refresh 
interval).
+            return Math.max(1, value / 2);
+        }
+
+        private void schedule(long intervalSeconds) {
             if (scheduledFuture != null) {
                 scheduledFuture.cancel(false);
             }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 7e32791d95b..5c92b69c99c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -236,6 +236,10 @@ public class SqlStatisticManagerImpl extends 
AbstractEventProducer<StatisticChan
         catalogService.removeListener(CatalogEvent.TABLE_DROP, 
dropTableEventListener);
         catalogService.removeListener(CatalogEvent.TABLE_CREATE, 
createTableEventListener);
         
staleRowsCheckIntervalSeconds.stopListen(updateRefreshIntervalListener);
+
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+        }
     }
 
     private void onTableDrop(DropTableEventParameters parameters) {
@@ -329,10 +333,4 @@ public class SqlStatisticManagerImpl extends 
AbstractEventProducer<StatisticChan
     public void forceUpdateAll() {
         update(true);
     }
-
-    /** Returns a future of that task that updates statistics. */
-    @TestOnly
-    public ScheduledFuture<?> currentScheduledFuture() {
-        return scheduledFuture;
-    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index a3a9b4a55f1..456d7ee4d70 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -34,6 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -43,6 +47,7 @@ import static org.mockito.Mockito.when;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -53,13 +58,16 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.ignite.configuration.ConfigurationValue;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -87,9 +95,7 @@ import 
org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.StatsCounter;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.type.NativeType;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -100,35 +106,50 @@ import org.apache.ignite.sql.SqlException;
 import org.awaitility.Awaitility;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mock.Strictness;
 import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Tests to verify {@link PrepareServiceImpl}.
  */
-@ExtendWith(ExecutorServiceExtension.class)
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
 public class PrepareServiceImplTest extends BaseIgniteAbstractTest {
-    private static final List<PrepareService> createdServices = new 
ArrayList<>();
-    private static final int PLAN_UPDATER_REFRESH_PERIOD = 5_000;
+    private final List<PrepareService> createdServices = new ArrayList<>();
 
-    @InjectExecutorService
-    private static ScheduledExecutorService commonExecutor;
+    @Mock(strictness = Strictness.LENIENT)
+    private ScheduledExecutorService commonExecutor;
+
+    @Mock(strictness = Strictness.LENIENT)
+    private ScheduledFuture<?> taskFuture;
+
+    @Mock(strictness = Strictness.LENIENT)
+    private ClockService clockService;
 
     @InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
     private static StatisticsConfiguration statisticsConfiguration;
 
+    private final ArrayDeque<Runnable> scheduledTasks = new ArrayDeque<>();
+
+    @BeforeEach
+    public void initTaskScheduler() {
+        prepareTaskScheduler();
+    }
+
     @AfterEach
     public void stopServices() throws Exception {
         for (PrepareService createdService : createdServices) {
             createdService.stop();
         }
-
-        createdServices.clear();
     }
 
     @ParameterizedTest
@@ -317,33 +338,20 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    public void timeoutedPlanShouldBeRemovedFromCache() throws 
InterruptedException {
+    public void timedOutPlanShouldBeRemovedFromCache()  {
         IgniteTable igniteTable = TestBuilders.table()
                 .name("T")
                 .addColumn("C", NativeTypes.INT32)
                 .distribution(IgniteDistributions.single())
                 .build();
 
-        // Create a proxy.
-        IgniteTable spyTable = spy(igniteTable);
-
-        // Override and slowdown a method, which is called by Planner, to 
emulate long planning.
-        Mockito.doAnswer(inv -> {
-            try {
-                Thread.sleep(300);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-            // Call original method.
-            return igniteTable.getRowType(inv.getArgument(0), 
inv.getArgument(1));
-        }).when(spyTable).getRowType(any(), any());
-
         IgniteSchema schema = new IgniteSchema("PUBLIC", 0, 
List.of(igniteTable));
         Cache<Object, Object> cache = 
CaffeineCacheFactory.INSTANCE.create(100);
 
         CacheFactory cacheFactory = new DummyCacheFactory(cache);
 
-        PrepareServiceImpl service = createPlannerService(schema, 
cacheFactory, 100);
+        // Set planning timeout 1 millisecond, this value is small enough to 
cause a planning timeout exception. 
+        PrepareServiceImpl service = createPlannerService(schema, 
cacheFactory, 1);
 
         StringBuilder stmt = new StringBuilder();
         for (int i = 0; i < 100; i++) {
@@ -368,8 +376,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code(), "Unexpected 
error: " + sqlErr);
 
         // Cache invalidate does not immediately remove the entry, so we need 
to wait some time to ensure it is removed.
-        boolean empty = IgniteTestUtils.waitForCondition(() -> cache.size() == 
0, 1000);
-        assertTrue(empty, "Cache is not empty: " + cache.size());
+        Awaitility.await().untilAsserted(() -> assertEquals(0, cache.size()));
     }
 
     @Test
@@ -417,19 +424,15 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         service.statisticsChanged(table.id());
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> 
!selectPlan.equals(await(service.prepareAsync(parse(selectQuery), 
operationContext().build())))
-                );
-
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> 
!insertPlan.equals(await(service.prepareAsync(parse(insertQuery), 
operationContext().build())))
-                );
+        // Run update plan task.
+        runScheduledTasks();
 
-        assertThat(service.cache.size(), is(2));
+        // Planning is done in a separate thread.
+        Awaitility.await().untilAsserted(() -> {
+            assertNotSame(selectPlan, 
await(service.prepareAsync(parse(selectQuery), operationContext().build())));
+            assertNotSame(insertPlan, 
await(service.prepareAsync(parse(insertQuery), operationContext().build())));
+            assertThat(service.cache.size(), is(2));
+        });
     }
 
     @Test
@@ -458,8 +461,8 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         CacheKey key1 = service.cache.entrySet().iterator().next().getKey();
 
         // different table
-        String insertQuery = "SELECT * FROM test.t2 WHERE c = 1";
-        QueryPlan plan2 = await(service.prepareAsync(parse(insertQuery), 
operationContext().build()));
+        String anotherSelectQuery = "SELECT * FROM test.t2 WHERE c = 1";
+        QueryPlan plan2 = 
await(service.prepareAsync(parse(anotherSelectQuery), 
operationContext().build()));
         assertThat(service.cache.size(), is(1));
         CacheKey key2 = service.cache.entrySet().iterator().next().getKey();
 
@@ -471,11 +474,46 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         // cached table
         service.statisticsChanged(table2.id());
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> 
!plan2.equals(await(service.prepareAsync(parse(insertQuery), 
operationContext().build())))
-                );
+        // Run tasks that trigger re-planning
+        runScheduledTasks();
+
+        // Planning is done in a separate thread.
+        Awaitility.await().untilAsserted(() -> {
+            assertNotSame(plan2, 
await(service.prepareAsync(parse(anotherSelectQuery), 
operationContext().build())));
+        });
+    }
+
+    @Test
+    public void updateSchedulingInterval() throws Exception {
+        IgniteSchema schema = new IgniteSchema("TEST", 0, List.of());
+        ConfigurationValue<Integer> configurationValue = 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
+        configurationValue.update(60).join();
+
+        // Starts in createPlannerService
+        PrepareServiceImpl service = (PrepareServiceImpl) 
createPlannerService(schema, 1);
+
+        // Initial values
+        configurationValue.update(42).join();
+
+        // Update - same value
+        configurationValue.update(42).join();
+
+        // Cannot be less than 1
+        configurationValue.update(1).join();
+
+        service.stop();
+
+        InOrder inOrder = Mockito.inOrder(commonExecutor, taskFuture);
+        // Initial
+        
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class), 
eq(30L), eq(30L), eq(TimeUnit.SECONDS));
+        // Update 1
+        inOrder.verify(taskFuture).cancel(anyBoolean());
+        
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class), 
eq(21L), eq(21L), eq(TimeUnit.SECONDS));
+        // Update 2
+        inOrder.verify(taskFuture).cancel(anyBoolean());
+        
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class), eq(1L), 
eq(1L), eq(TimeUnit.SECONDS));
+        // Stop
+        inOrder.verify(taskFuture).cancel(anyBoolean());
     }
 
     /** Validate that plan updates only for current catalog version. */
@@ -506,24 +544,19 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         QueryPlan plan2 = await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(10000))
-                .until(
-                        () -> service.cache.size() == 2
-                );
+        runScheduledTasks();
+        assertEquals(2, service.cache.size());
 
-        assertThat(service.cache.size(), is(2));
         service.statisticsChanged(table1.id());
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> 
!plan2.equals(await(service.prepareAsync(parse(selectQuery), 
operationContext().build())))
-                );
+        runScheduledTasks();
+        // Let eviction tasks to run.
+        Awaitility.await().untilAsserted(() -> 
+                assertNotSame(plan2, 
await(service.prepareAsync(parse(selectQuery), operationContext().build()))));
 
         // previous catalog, get cached plan
         ver.set(0);
-        assertEquals(plan1, await(service.prepareAsync(parse(selectQuery), 
operationContext().build())));
+        assertSame(plan1, await(service.prepareAsync(parse(selectQuery), 
operationContext().build())));
     }
 
     @Test
@@ -548,21 +581,16 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         await(service.prepareAsync(parse(selectQuery), 
operationContext().build()));
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> service.cache.size() == 2
-                );
+        runScheduledTasks();
+        assertEquals(2, service.cache.size());
 
         service.statisticsChanged(table1.id());
 
         Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedSnap = new 
HashSet<>(service.cache.entrySet());
 
-        Awaitility.await()
-                .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
-                .until(
-                        () -> !new 
HashSet<>(service.cache.entrySet()).equals(cachedSnap)
-                );
+        runScheduledTasks();
+        // Let eviction tasks to run.
+        Awaitility.await().untilAsserted(() -> 
assertNotEquals(service.cache.entrySet(), cachedSnap));
 
         // cache futures snapshot highlight only one invalidation item.
         assertThat(cachedSnap.stream().filter(e -> 
e.getValue().join().needInvalidate()).count(), is(1L));
@@ -842,35 +870,33 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         return new IgniteSchema("PUBLIC", 0, List.of(table));
     }
 
-    private static PrepareService createPlannerService() {
+    private PrepareService createPlannerService() {
         return createPlannerService(createSchema());
     }
 
-    private static PrepareService createPlannerService(IgniteSchema schema, 
int cacheSize) {
+    private PrepareService createPlannerService(IgniteSchema schema, int 
cacheSize) {
         return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 
10000, Integer.MAX_VALUE, cacheSize);
     }
 
-    private static PrepareService createPlannerService(IgniteSchema schema) {
+    private PrepareService createPlannerService(IgniteSchema schema) {
         return createPlannerService(schema, CaffeineCacheFactory.INSTANCE, 
10000);
     }
 
-    private static PrepareServiceImpl createPlannerService(IgniteSchema 
schemas, CacheFactory cacheFactory, int timeoutMillis) {
+    private PrepareServiceImpl createPlannerService(IgniteSchema schemas, 
CacheFactory cacheFactory, int timeoutMillis) {
         return createPlannerService(schemas, cacheFactory, timeoutMillis, 
Integer.MAX_VALUE, 1000);
     }
 
-    private static PrepareServiceImpl createPlannerService(
+    private PrepareServiceImpl createPlannerService(
             IgniteSchema schemas,
             CacheFactory cacheFactory,
             int timeoutMillis,
             int planExpireSeconds,
             int cacheSize
     ) {
-        ClockServiceImpl clockService = mock(ClockServiceImpl.class);
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
-        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {
-        };
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
 
         PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
                 mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
@@ -885,7 +911,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         return service;
     }
 
-    private static PrepareServiceImpl createPlannerService(
+    private PrepareServiceImpl createPlannerService(
             IgniteSchema schemas,
             CacheFactory cacheFactory,
             int timeoutMillis,
@@ -896,7 +922,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         return createPlannerService(schemas, cacheFactory, timeoutMillis, 
planExpireSeconds, cacheSize, ver, commonExecutor);
     }
 
-    private static PrepareServiceImpl createPlannerService(
+    private PrepareServiceImpl createPlannerService(
             IgniteSchema schemas,
             CacheFactory cacheFactory,
             int timeoutMillis,
@@ -909,8 +935,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
 
         when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000, 
500).longValue());
 
-        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {
-        };
+        AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
 
         PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize, 
cacheFactory,
                 mock(DdlSqlToCommandConverter.class), timeoutMillis, 2, 
planExpireSeconds, mock(MetricManagerImpl.class),
@@ -925,7 +950,7 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
         return service;
     }
 
-    private static PrepareServiceImpl 
createPlannerServiceWithInactivePlanUpdater(
+    private PrepareServiceImpl createPlannerServiceWithInactivePlanUpdater(
             IgniteSchema schemas,
             CacheFactory cacheFactory,
             int timeoutMillis,
@@ -960,4 +985,23 @@ public class PrepareServiceImplTest extends 
BaseIgniteAbstractTest {
             return (Cache<K, V>) cache;
         }
     }
+
+    private void prepareTaskScheduler() {
+        doAnswer(invocation -> {
+            Runnable r = invocation.getArgument(0);
+            scheduledTasks.add(r);
+            return taskFuture;
+        }).when(commonExecutor).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+
+        doAnswer(invocation -> {
+            scheduledTasks.poll();
+            return true;
+        }).when(taskFuture).cancel(anyBoolean());
+    }
+
+    private void runScheduledTasks() {
+        for (Runnable r : scheduledTasks) {
+            r.run();
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
index bab86989bec..3dcda8df100 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
@@ -20,12 +20,10 @@ package org.apache.ignite.internal.sql.engine.statistic;
 import static it.unimi.dsi.fastutil.ints.Int2ObjectMap.entry;
 import static 
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.DEFAULT_TABLE_SIZE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
@@ -38,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntList;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -45,7 +44,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.internal.catalog.Catalog;
@@ -67,22 +65,20 @@ import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.sql.ColumnType;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mock.Strictness;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Tests of SqlStatisticManagerImpl.
  */
 @ExtendWith(MockitoExtension.class)
-@ExtendWith(ExecutorServiceExtension.class)
 @ExtendWith(ConfigurationExtension.class)
 class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest {
 
@@ -107,8 +103,11 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
     @Mock(strictness = Strictness.LENIENT)
     private StatisticAggregatorImpl statAggregator;
 
-    @InjectExecutorService
-    private ScheduledExecutorService commonExecutor;
+    @Mock(strictness = Strictness.LENIENT)
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @Mock(strictness = Strictness.LENIENT)
+    private ScheduledFuture<Void> taskFuture;
 
     @InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
     private StatisticsConfiguration statisticsConfiguration;
@@ -116,11 +115,14 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
     private static final CatalogTableColumnDescriptor pkCol =
             new CatalogTableColumnDescriptor("pkCol", ColumnType.STRING, 
false, 0, 0, 10, null);
 
+    private final ArrayDeque<Runnable> scheduledTasks = new ArrayDeque<>();
+
     @Test
     public void checkDefaultTableSize() {
         int tableId = ThreadLocalRandom.current().nextInt();
         // Preparing:
         prepareEmptyCatalog();
+        prepareTaskScheduler();
 
         when(statAggregator.estimatedSizeWithLastUpdate(List.of()))
                 
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMaps.emptyMap()));
@@ -128,6 +130,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
+        runScheduledTasks();
+
         // Test:
         // return default value for unknown table.
         assertEquals(DEFAULT_TABLE_SIZE, 
sqlStatisticManager.tableSize(tableId));
@@ -139,6 +143,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         long tableSize = 999_888_777L;
         // Preparing:
         prepareCatalogWithTable(tableId);
+        prepareTaskScheduler();
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -149,11 +154,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
-        long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
-        Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(() 
->
-                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
-        );
+        runScheduledTasks();
+        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
 
         // Second time we should obtain the same value from a cache.
         assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
@@ -171,6 +173,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         HybridTimestamp time2 = 
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
         // Preparing:
         prepareCatalogWithTable(tableId);
+        prepareTaskScheduler();
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -186,11 +189,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
-        long timeout = 2 * UPDATE_INTERVAL_SECONDS;
+        runScheduledTasks();
 
-        Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(() 
->
-                assertEquals(tableSize1, 
sqlStatisticManager.tableSize(tableId))
-        );
         // The second time we should obtain the same value from a cache.
         assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
 
@@ -249,9 +249,13 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         when(statAggregator.estimatedSizeWithLastUpdate(any()))
                 .thenReturn(CompletableFuture.completedFuture(modifications));
 
+        prepareTaskScheduler();
+
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
+        runScheduledTasks();
+
         sqlStatisticManager.forceUpdateAll();
         sqlStatisticManager.lastUpdateStatisticFuture().get(5_000, 
TimeUnit.MILLISECONDS);
 
@@ -275,6 +279,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         long tableSize = 999_888_777L;
         // Preparing:
         prepareCatalogWithTable(tableId);
+        prepareTaskScheduler();
 
         ArgumentCaptor<EventListener<DropTableEventParameters>> 
tableDropCapture = ArgumentCaptor.forClass(EventListener.class);
         doNothing().when(catalogManager).listen(eq(CatalogEvent.TABLE_DROP), 
tableDropCapture.capture());
@@ -292,11 +297,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
-        long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
-        Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(() 
->
-                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
-        );
+        runScheduledTasks();
+        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
 
         int catalogVersionBeforeDrop = 1;
         int catalogVersionAfterDrop = 2;
@@ -338,6 +340,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                 
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
                         entry(tableId, new 
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
 
+        prepareTaskScheduler();
+
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
@@ -356,12 +360,9 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                 .storageProfile("")
                 .build();
         tableCreateCapture.getValue().notify(new 
CreateTableEventParameters(-1, 0, catalogDescriptor));
-        // now table is created and size should be actual.
-        long timeout = 2 * UPDATE_INTERVAL_SECONDS;
 
-        Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(() 
->
-                assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
-        );
+        runScheduledTasks();
+        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
     }
 
     @Test
@@ -387,6 +388,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
 
+        prepareTaskScheduler();
+
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
@@ -395,6 +398,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
 
         // Error was handled after the first call.
+        runScheduledTasks();
 
         // tableSize2
         sqlStatisticManager.forceUpdateAll();
@@ -424,6 +428,8 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                         entry(tableId, new 
PartitionModificationInfo(tableSize2, time2.longValue()))))
         );
 
+        prepareTaskScheduler();
+
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
@@ -436,55 +442,45 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         // get tableSize2
         sqlStatisticManager.forceUpdateAll();
 
-        long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
-        Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(() 
->
-                assertEquals(tableSize2, 
sqlStatisticManager.tableSize(tableId))
-        );
+        runScheduledTasks();
+        assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
 
         // Stale result must be discarded.
         assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
     }
 
     @Test
-    public void rescheduleWhenUpdateIntervalChanges() {
+    public void updateSchedulingInterval() {
         prepareEmptyCatalog();
+        prepareTaskScheduler();
 
         when(statAggregator.estimatedSizeWithLastUpdate(List.of()))
                 
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMaps.emptyMap()));
 
+        ConfigurationValue<Integer> configurationValue = 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
+        configurationValue.update(UPDATE_INTERVAL_SECONDS).join();
+
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager();
         sqlStatisticManager.start();
 
-        // Increase interval - reschedule
-        {
-            ScheduledFuture<?> f = 
sqlStatisticManager.currentScheduledFuture();
-
-            ConfigurationValue<Integer> interval = 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
-            interval.update(UPDATE_INTERVAL_SECONDS + 1).join();
-            assertNotSame(f, sqlStatisticManager.currentScheduledFuture());
-        }
-
-        // Decrease the interval - reschedule
-        {
-            ScheduledFuture<?> f = 
sqlStatisticManager.currentScheduledFuture();
-
-            ConfigurationValue<Integer> interval = 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
-            interval.update(UPDATE_INTERVAL_SECONDS - 1).join();
-
-            assertNotSame(f, sqlStatisticManager.currentScheduledFuture());
-        }
-
-        // Do not reschedule
-        {
-            ScheduledFuture<?> f = 
sqlStatisticManager.currentScheduledFuture();
-
-            ConfigurationValue<Integer> interval = 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
-            int currentValue = interval.value().intValue();
-            interval.update(currentValue).join();
-
-            assertSame(f, sqlStatisticManager.currentScheduledFuture());
-        }
+        configurationValue.update(UPDATE_INTERVAL_SECONDS + 1).join();
+        configurationValue.update(UPDATE_INTERVAL_SECONDS - 1).join();
+        // Same value no effect.
+        configurationValue.update(UPDATE_INTERVAL_SECONDS - 1).join();
+
+        sqlStatisticManager.stop();
+
+        InOrder inOrder = Mockito.inOrder(scheduledExecutorService, 
taskFuture);
+        // Start
+        
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
 eq(0L), eq(5L), eq(TimeUnit.SECONDS));
+        // Update 1
+        inOrder.verify(taskFuture).cancel(anyBoolean());
+        
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
 eq(6L), eq(6L), eq(TimeUnit.SECONDS));
+        // Update 2
+        inOrder.verify(taskFuture).cancel(anyBoolean());
+        
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
 eq(4L), eq(4L), eq(TimeUnit.SECONDS));
+        // Stopping cancels the task.
+        inOrder.verify(taskFuture).cancel(anyBoolean());
     }
 
     @Test
@@ -493,6 +489,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         long tableSize = 1_000L;
         // Preparing:
         prepareCatalogWithTable(tableId);
+        prepareTaskScheduler();
 
         when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
         when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -512,17 +509,12 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         SqlStatisticManagerImpl sqlStatisticManager = 
newSqlStatisticsManager(intervalSeconds);
         sqlStatisticManager.start();
 
-        ScheduledFuture<?> f = sqlStatisticManager.currentScheduledFuture();
-        // An exception does not terminate the task.
-        assertThrows(TimeoutException.class, () -> f.get(intervalSeconds * 2, 
TimeUnit.SECONDS));
+        // The first run fails
+        runScheduledTasks();
+        assertEquals(1, sqlStatisticManager.tableSize(tableId));
 
-        // The statistics will eventually refresh
-        Awaitility.await()
-                .pollInterval(intervalSeconds / 2, TimeUnit.SECONDS)
-                .untilAsserted(() -> assertEquals(tableSize, 
sqlStatisticManager.tableSize(tableId)));
-
-        // The task is still running
-        assertFalse(f.isDone());
+        runScheduledTasks();
+        assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
     }
 
     private SqlStatisticManagerImpl newSqlStatisticsManager() {
@@ -537,7 +529,7 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
                 tableManager,
                 catalogManager,
                 lowWatermark,
-                commonExecutor,
+                scheduledExecutorService,
                 statAggregator,
                 checkInterval
         );
@@ -568,4 +560,23 @@ class SqlStatisticManagerImplTest extends 
BaseIgniteAbstractTest {
         when(catalogManager.catalog(1)).thenReturn(catalog);
         when(catalog.tables()).thenReturn(List.of());
     }
+
+    private void prepareTaskScheduler() {
+        doAnswer(invocation -> {
+            Runnable r = invocation.getArgument(0);
+            scheduledTasks.add(r);
+            return taskFuture;
+        
}).when(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class), 
anyLong(), anyLong(), any(TimeUnit.class));
+
+        doAnswer(invocation -> {
+            scheduledTasks.poll();
+            return true;
+        }).when(taskFuture).cancel(anyBoolean());
+    }
+
+    private void runScheduledTasks() {
+        for (Runnable r : scheduledTasks) {
+            r.run();
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index 31c926f73b5..7e5291892e1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
 import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import 
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
@@ -48,24 +49,28 @@ import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParam
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mock.Strictness;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Test planning cache metrics.
  */
-@ExtendWith(ExecutorServiceExtension.class)
+@ExtendWith(MockitoExtension.class)
 @ExtendWith(ConfigurationExtension.class)
 public class PlanningCacheMetricsTest extends AbstractPlannerTest {
 
     private final MetricManager metricManager = new MetricManagerImpl();
 
-    @InjectExecutorService
+    @Mock(strictness = Strictness.LENIENT)
     private ScheduledExecutorService commonExecutor;
 
+    @Mock
+    private DdlSqlToCommandConverter ddlCommandConverter;
+
     @InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
     private StatisticsConfiguration statisticsConfiguration;
 
@@ -90,8 +95,8 @@ public class PlanningCacheMetricsTest extends 
AbstractPlannerTest {
         AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters> 
producer = new AbstractEventProducer<>() {};
 
         PrepareService prepareService = new PrepareServiceImpl(
-                "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE, 
metricManager, new PredefinedSchemaManager(schema),
-                clockService::currentLong, commonExecutor, producer,
+                "test", 2, cacheFactory, ddlCommandConverter, 15_000L, 2, 
Integer.MAX_VALUE, metricManager,
+                new PredefinedSchemaManager(schema), 
clockService::currentLong, commonExecutor, producer,
                 
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds()
         );
 

Reply via email to