This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fc07fd6216e IGNITE-27266 Sql. Improved potentially flaky tests
(ItStatisticTest, SqlStatisticManagerImplTest) (#7206)
fc07fd6216e is described below
commit fc07fd6216e1cdd040ac9be2aa8de59585561685
Author: Max Zhuravkov <[email protected]>
AuthorDate: Mon Dec 15 19:03:02 2025 +0200
IGNITE-27266 Sql. Improved potentially flaky tests (ItStatisticTest,
SqlStatisticManagerImplTest) (#7206)
---
.../sql/engine/statistic/ItStatisticTest.java | 89 +++------
.../sql/engine/prepare/PrepareServiceImpl.java | 15 +-
.../engine/statistic/SqlStatisticManagerImpl.java | 10 +-
.../sql/engine/prepare/PrepareServiceImplTest.java | 206 +++++++++++++--------
.../statistic/SqlStatisticManagerImplTest.java | 163 ++++++++--------
.../sql/metrics/PlanningCacheMetricsTest.java | 17 +-
6 files changed, 267 insertions(+), 233 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
index a855dc731aa..7cc7bd1bdc6 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
@@ -46,6 +46,10 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
private static final long REFRESH_PERIOD_MILLIS = 5_000;
+ private static final Duration REFRESH_CHECK_TIMEOUT =
Duration.ofSeconds(20);
+
+ private static final Duration REFRESH_POLL_INTERVAL =
Duration.ofMillis(50);
+
@BeforeAll
void beforeAll() {
sqlStatisticManager = (SqlStatisticManagerImpl)
queryProcessor().sqlStatisticManager();
@@ -80,12 +84,8 @@ public class ItStatisticTest extends BaseSqlIntegrationTest {
AtomicInteger inc = new AtomicInteger();
- long timeout = calcStatisticUpdateIntervalMillis();
- // max 10 times cache pollution
- long pollInterval = timeout / 10;
-
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() ->
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
assertQuery(format("select {} from t",
inc.incrementAndGet()))
.matches(nodeRowCount("TableScan", is((int)
update)))
.check()
@@ -98,20 +98,15 @@ public class ItStatisticTest extends BaseSqlIntegrationTest
{
long updates1 = insert(0L, milestone);
- long timeout = calcStatisticUpdateIntervalMillis();
-
- // max 10 times cache pollution
- long pollInterval = timeout / 10;
-
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- sqlStatisticManager.forceUpdateAll();
- sqlStatisticManager.lastUpdateStatisticFuture().join();
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() -> {
+ sqlStatisticManager.forceUpdateAll();
+
sqlStatisticManager.lastUpdateStatisticFuture().join();
- assertQuery("select 1 from t")
- .matches(nodeRowCount("TableScan", is((int)
updates1)))
- .check();
- }
+ assertQuery("select 1 from t")
+ .matches(nodeRowCount("TableScan",
is((int) updates1)))
+ .check();
+ }
);
milestone = computeNextMilestone(milestone,
DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT);
@@ -122,15 +117,12 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
sqlStatisticManager.lastUpdateStatisticFuture().join();
// query not cached in plans
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
- sqlStatisticManager.forceUpdateAll();
- sqlStatisticManager.lastUpdateStatisticFuture().join();
-
- assertQuery("select 1 from t")
- .matches(nodeRowCount("TableScan", is((int)
updates2)))
- .check();
- }
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() -> {
+ assertQuery("select 1 from t")
+ .matches(nodeRowCount("TableScan",
is((int) updates2)))
+ .check();
+ }
);
}
@@ -150,13 +142,9 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
String query = "SELECT /*+ DISABLE_RULE('HashJoinConverter',
'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ "
+ "j1.* FROM j2, j1 WHERE j2.id = j1.id";
- long statRefresh = calcStatisticUpdateIntervalMillis();
-
- // max 10 times cache pollution
- long pollInterval = statRefresh / 10;
-
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT)
+ .untilAsserted(() ->
assertQuery(query)
// expecting right source has less rows
than left
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
@@ -169,8 +157,8 @@ public class ItStatisticTest extends BaseSqlIntegrationTest
{
sqlStatisticManager.forceUpdateAll();
sqlStatisticManager.lastUpdateStatisticFuture().get(5,
TimeUnit.SECONDS);
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
assertQuery(query)
// expecting right source has less rows
than left
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
@@ -204,25 +192,21 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
sqlClusterConfig.statistics().autoRefresh().staleRowsCheckIntervalSeconds()
.update((int) (newRefreshInterval / 1000)).join();
- long statRefresh =
calcStatisticUpdateIntervalMillis(newRefreshInterval);
- long pollInterval = statRefresh / 10;
-
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
assertQuery(query)
// expecting right source has less rows
than left
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
.returnNothing()
- .check()
- );
+ .check());
sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 3 *
DEFAULT_MIN_STALE_ROWS_COUNT);
sqlStatisticManager.forceUpdateAll();
sqlStatisticManager.lastUpdateStatisticFuture().get(5,
TimeUnit.SECONDS);
- Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
- .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ Awaitility.await().pollInterval(REFRESH_POLL_INTERVAL)
+ .timeout(REFRESH_CHECK_TIMEOUT).untilAsserted(() ->
assertQuery(query)
// expecting right source has less rows
than left
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
@@ -235,19 +219,6 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
}
}
- private static long calcStatisticUpdateIntervalMillis() {
- return calcStatisticUpdateIntervalMillis(REFRESH_PERIOD_MILLIS);
- }
-
- private static long calcStatisticUpdateIntervalMillis(long
refreshPeriodMillis) {
- long inc = TimeUnit.SECONDS.toMillis(2);
- // need to wait at least 2 statistic updates.
- long statisticAggregationTimeout = refreshPeriodMillis + 2 *
refreshPeriodMillis;
- long delay = refreshPeriodMillis / 2;
-
- return statisticAggregationTimeout + delay + inc;
- }
-
// copy-paste from private method:
PartitionModificationCounter#computeNextMilestone
// if implementation will changes, it need to be changed too
private static long computeNextMilestone(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index b337481e4f3..1beaffb7784 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -1139,8 +1139,9 @@ public class PrepareServiceImpl implements PrepareService
{
staleRowsCheckIntervalSeconds.listen(configListener);
int intervalSeconds = staleRowsCheckIntervalSeconds.value();
+ long interval = calculateInterval(intervalSeconds);
- schedule(intervalSeconds);
+ schedule(interval);
}
void stop() {
@@ -1155,16 +1156,20 @@ public class PrepareServiceImpl implements
PrepareService {
assert seconds != null;
if (!Objects.equals(seconds, value.oldValue())) {
- // To observe actual values of statistics, plan cache updates
should happen more frequently
- // (plan update interval must be less than statistics auto
refresh interval).
- int interval = Math.max(1, seconds / 2);
+ long interval = calculateInterval(seconds);
schedule(interval);
}
return nullCompletedFuture();
}
- private void schedule(int intervalSeconds) {
+ private static long calculateInterval(long value) {
+ // To observe actual values of statistics, plan cache updates
should happen more frequently
+ // (plan update interval must be less than statistics auto refresh
interval).
+ return Math.max(1, value / 2);
+ }
+
+ private void schedule(long intervalSeconds) {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 7e32791d95b..5c92b69c99c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -236,6 +236,10 @@ public class SqlStatisticManagerImpl extends
AbstractEventProducer<StatisticChan
catalogService.removeListener(CatalogEvent.TABLE_DROP,
dropTableEventListener);
catalogService.removeListener(CatalogEvent.TABLE_CREATE,
createTableEventListener);
staleRowsCheckIntervalSeconds.stopListen(updateRefreshIntervalListener);
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ }
}
private void onTableDrop(DropTableEventParameters parameters) {
@@ -329,10 +333,4 @@ public class SqlStatisticManagerImpl extends
AbstractEventProducer<StatisticChan
public void forceUpdateAll() {
update(true);
}
-
- /** Returns a future of that task that updates statistics. */
- @TestOnly
- public ScheduledFuture<?> currentScheduledFuture() {
- return scheduledFuture;
- }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index a3a9b4a55f1..456d7ee4d70 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -34,6 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -43,6 +47,7 @@ import static org.mockito.Mockito.when;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -53,13 +58,16 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.ignite.configuration.ConfigurationValue;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -87,9 +95,7 @@ import
org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.StatsCounter;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.type.NativeType;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -100,35 +106,50 @@ import org.apache.ignite.sql.SqlException;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mock.Strictness;
import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests to verify {@link PrepareServiceImpl}.
*/
-@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
public class PrepareServiceImplTest extends BaseIgniteAbstractTest {
- private static final List<PrepareService> createdServices = new
ArrayList<>();
- private static final int PLAN_UPDATER_REFRESH_PERIOD = 5_000;
+ private final List<PrepareService> createdServices = new ArrayList<>();
- @InjectExecutorService
- private static ScheduledExecutorService commonExecutor;
+ @Mock(strictness = Strictness.LENIENT)
+ private ScheduledExecutorService commonExecutor;
+
+ @Mock(strictness = Strictness.LENIENT)
+ private ScheduledFuture<?> taskFuture;
+
+ @Mock(strictness = Strictness.LENIENT)
+ private ClockService clockService;
@InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
private static StatisticsConfiguration statisticsConfiguration;
+ private final ArrayDeque<Runnable> scheduledTasks = new ArrayDeque<>();
+
+ @BeforeEach
+ public void initTaskScheduler() {
+ prepareTaskScheduler();
+ }
+
@AfterEach
public void stopServices() throws Exception {
for (PrepareService createdService : createdServices) {
createdService.stop();
}
-
- createdServices.clear();
}
@ParameterizedTest
@@ -317,33 +338,20 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
}
@Test
- public void timeoutedPlanShouldBeRemovedFromCache() throws
InterruptedException {
+ public void timedOutPlanShouldBeRemovedFromCache() {
IgniteTable igniteTable = TestBuilders.table()
.name("T")
.addColumn("C", NativeTypes.INT32)
.distribution(IgniteDistributions.single())
.build();
- // Create a proxy.
- IgniteTable spyTable = spy(igniteTable);
-
- // Override and slowdown a method, which is called by Planner, to
emulate long planning.
- Mockito.doAnswer(inv -> {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- // Call original method.
- return igniteTable.getRowType(inv.getArgument(0),
inv.getArgument(1));
- }).when(spyTable).getRowType(any(), any());
-
IgniteSchema schema = new IgniteSchema("PUBLIC", 0,
List.of(igniteTable));
Cache<Object, Object> cache =
CaffeineCacheFactory.INSTANCE.create(100);
CacheFactory cacheFactory = new DummyCacheFactory(cache);
- PrepareServiceImpl service = createPlannerService(schema,
cacheFactory, 100);
+ // Set planning timeout 1 millisecond, this value is small enough to
cause a planning timeout exception.
+ PrepareServiceImpl service = createPlannerService(schema,
cacheFactory, 1);
StringBuilder stmt = new StringBuilder();
for (int i = 0; i < 100; i++) {
@@ -368,8 +376,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code(), "Unexpected
error: " + sqlErr);
// Cache invalidate does not immediately remove the entry, so we need
to wait some time to ensure it is removed.
- boolean empty = IgniteTestUtils.waitForCondition(() -> cache.size() ==
0, 1000);
- assertTrue(empty, "Cache is not empty: " + cache.size());
+ Awaitility.await().untilAsserted(() -> assertEquals(0, cache.size()));
}
@Test
@@ -417,19 +424,15 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
service.statisticsChanged(table.id());
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () ->
!selectPlan.equals(await(service.prepareAsync(parse(selectQuery),
operationContext().build())))
- );
-
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () ->
!insertPlan.equals(await(service.prepareAsync(parse(insertQuery),
operationContext().build())))
- );
+ // Run update plan task.
+ runScheduledTasks();
- assertThat(service.cache.size(), is(2));
+ // Planning is done in a separate thread.
+ Awaitility.await().untilAsserted(() -> {
+ assertNotSame(selectPlan,
await(service.prepareAsync(parse(selectQuery), operationContext().build())));
+ assertNotSame(insertPlan,
await(service.prepareAsync(parse(insertQuery), operationContext().build())));
+ assertThat(service.cache.size(), is(2));
+ });
}
@Test
@@ -458,8 +461,8 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
CacheKey key1 = service.cache.entrySet().iterator().next().getKey();
// different table
- String insertQuery = "SELECT * FROM test.t2 WHERE c = 1";
- QueryPlan plan2 = await(service.prepareAsync(parse(insertQuery),
operationContext().build()));
+ String anotherSelectQuery = "SELECT * FROM test.t2 WHERE c = 1";
+ QueryPlan plan2 =
await(service.prepareAsync(parse(anotherSelectQuery),
operationContext().build()));
assertThat(service.cache.size(), is(1));
CacheKey key2 = service.cache.entrySet().iterator().next().getKey();
@@ -471,11 +474,46 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
// cached table
service.statisticsChanged(table2.id());
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () ->
!plan2.equals(await(service.prepareAsync(parse(insertQuery),
operationContext().build())))
- );
+ // Run tasks that trigger re-planning
+ runScheduledTasks();
+
+ // Planning is done in a separate thread.
+ Awaitility.await().untilAsserted(() -> {
+ assertNotSame(plan2,
await(service.prepareAsync(parse(anotherSelectQuery),
operationContext().build())));
+ });
+ }
+
+ @Test
+ public void updateSchedulingInterval() throws Exception {
+ IgniteSchema schema = new IgniteSchema("TEST", 0, List.of());
+ ConfigurationValue<Integer> configurationValue =
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
+ configurationValue.update(60).join();
+
+ // Starts in createPlannerService
+ PrepareServiceImpl service = (PrepareServiceImpl)
createPlannerService(schema, 1);
+
+ // Initial values
+ configurationValue.update(42).join();
+
+ // Update - same value
+ configurationValue.update(42).join();
+
+ // Cannot be less than 1
+ configurationValue.update(1).join();
+
+ service.stop();
+
+ InOrder inOrder = Mockito.inOrder(commonExecutor, taskFuture);
+ // Initial
+
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class),
eq(30L), eq(30L), eq(TimeUnit.SECONDS));
+ // Update 1
+ inOrder.verify(taskFuture).cancel(anyBoolean());
+
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class),
eq(21L), eq(21L), eq(TimeUnit.SECONDS));
+ // Update 2
+ inOrder.verify(taskFuture).cancel(anyBoolean());
+
inOrder.verify(commonExecutor).scheduleAtFixedRate(any(Runnable.class), eq(1L),
eq(1L), eq(TimeUnit.SECONDS));
+ // Stop
+ inOrder.verify(taskFuture).cancel(anyBoolean());
}
/** Validate that plan updates only for current catalog version. */
@@ -506,24 +544,19 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
QueryPlan plan2 = await(service.prepareAsync(parse(selectQuery),
operationContext().build()));
- Awaitility.await()
- .atMost(Duration.ofMillis(10000))
- .until(
- () -> service.cache.size() == 2
- );
+ runScheduledTasks();
+ assertEquals(2, service.cache.size());
- assertThat(service.cache.size(), is(2));
service.statisticsChanged(table1.id());
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () ->
!plan2.equals(await(service.prepareAsync(parse(selectQuery),
operationContext().build())))
- );
+ runScheduledTasks();
+ // Let eviction tasks to run.
+ Awaitility.await().untilAsserted(() ->
+ assertNotSame(plan2,
await(service.prepareAsync(parse(selectQuery), operationContext().build()))));
// previous catalog, get cached plan
ver.set(0);
- assertEquals(plan1, await(service.prepareAsync(parse(selectQuery),
operationContext().build())));
+ assertSame(plan1, await(service.prepareAsync(parse(selectQuery),
operationContext().build())));
}
@Test
@@ -548,21 +581,16 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
await(service.prepareAsync(parse(selectQuery),
operationContext().build()));
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () -> service.cache.size() == 2
- );
+ runScheduledTasks();
+ assertEquals(2, service.cache.size());
service.statisticsChanged(table1.id());
Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedSnap = new
HashSet<>(service.cache.entrySet());
- Awaitility.await()
- .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
- .until(
- () -> !new
HashSet<>(service.cache.entrySet()).equals(cachedSnap)
- );
+ runScheduledTasks();
+ // Let eviction tasks to run.
+ Awaitility.await().untilAsserted(() ->
assertNotEquals(service.cache.entrySet(), cachedSnap));
// cache futures snapshot highlight only one invalidation item.
assertThat(cachedSnap.stream().filter(e ->
e.getValue().join().needInvalidate()).count(), is(1L));
@@ -842,35 +870,33 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
return new IgniteSchema("PUBLIC", 0, List.of(table));
}
- private static PrepareService createPlannerService() {
+ private PrepareService createPlannerService() {
return createPlannerService(createSchema());
}
- private static PrepareService createPlannerService(IgniteSchema schema,
int cacheSize) {
+ private PrepareService createPlannerService(IgniteSchema schema, int
cacheSize) {
return createPlannerService(schema, CaffeineCacheFactory.INSTANCE,
10000, Integer.MAX_VALUE, cacheSize);
}
- private static PrepareService createPlannerService(IgniteSchema schema) {
+ private PrepareService createPlannerService(IgniteSchema schema) {
return createPlannerService(schema, CaffeineCacheFactory.INSTANCE,
10000);
}
- private static PrepareServiceImpl createPlannerService(IgniteSchema
schemas, CacheFactory cacheFactory, int timeoutMillis) {
+ private PrepareServiceImpl createPlannerService(IgniteSchema schemas,
CacheFactory cacheFactory, int timeoutMillis) {
return createPlannerService(schemas, cacheFactory, timeoutMillis,
Integer.MAX_VALUE, 1000);
}
- private static PrepareServiceImpl createPlannerService(
+ private PrepareServiceImpl createPlannerService(
IgniteSchema schemas,
CacheFactory cacheFactory,
int timeoutMillis,
int planExpireSeconds,
int cacheSize
) {
- ClockServiceImpl clockService = mock(ClockServiceImpl.class);
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
- AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {
- };
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize,
cacheFactory,
mock(DdlSqlToCommandConverter.class), timeoutMillis, 2,
planExpireSeconds, mock(MetricManagerImpl.class),
@@ -885,7 +911,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
return service;
}
- private static PrepareServiceImpl createPlannerService(
+ private PrepareServiceImpl createPlannerService(
IgniteSchema schemas,
CacheFactory cacheFactory,
int timeoutMillis,
@@ -896,7 +922,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
return createPlannerService(schemas, cacheFactory, timeoutMillis,
planExpireSeconds, cacheSize, ver, commonExecutor);
}
- private static PrepareServiceImpl createPlannerService(
+ private PrepareServiceImpl createPlannerService(
IgniteSchema schemas,
CacheFactory cacheFactory,
int timeoutMillis,
@@ -909,8 +935,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
- AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {
- };
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize,
cacheFactory,
mock(DdlSqlToCommandConverter.class), timeoutMillis, 2,
planExpireSeconds, mock(MetricManagerImpl.class),
@@ -925,7 +950,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
return service;
}
- private static PrepareServiceImpl
createPlannerServiceWithInactivePlanUpdater(
+ private PrepareServiceImpl createPlannerServiceWithInactivePlanUpdater(
IgniteSchema schemas,
CacheFactory cacheFactory,
int timeoutMillis,
@@ -960,4 +985,23 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
return (Cache<K, V>) cache;
}
}
+
+ private void prepareTaskScheduler() {
+ doAnswer(invocation -> {
+ Runnable r = invocation.getArgument(0);
+ scheduledTasks.add(r);
+ return taskFuture;
+ }).when(commonExecutor).scheduleAtFixedRate(any(Runnable.class),
anyLong(), anyLong(), any(TimeUnit.class));
+
+ doAnswer(invocation -> {
+ scheduledTasks.poll();
+ return true;
+ }).when(taskFuture).cancel(anyBoolean());
+ }
+
+ private void runScheduledTasks() {
+ for (Runnable r : scheduledTasks) {
+ r.run();
+ }
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
index bab86989bec..3dcda8df100 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
@@ -20,12 +20,10 @@ package org.apache.ignite.internal.sql.engine.statistic;
import static it.unimi.dsi.fastutil.ints.Int2ObjectMap.entry;
import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.DEFAULT_TABLE_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -38,6 +36,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntList;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -45,7 +44,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.internal.catalog.Catalog;
@@ -67,22 +65,20 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.sql.ColumnType;
-import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mock.Strictness;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests of SqlStatisticManagerImpl.
*/
@ExtendWith(MockitoExtension.class)
-@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(ConfigurationExtension.class)
class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest {
@@ -107,8 +103,11 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
@Mock(strictness = Strictness.LENIENT)
private StatisticAggregatorImpl statAggregator;
- @InjectExecutorService
- private ScheduledExecutorService commonExecutor;
+ @Mock(strictness = Strictness.LENIENT)
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Mock(strictness = Strictness.LENIENT)
+ private ScheduledFuture<Void> taskFuture;
@InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
private StatisticsConfiguration statisticsConfiguration;
@@ -116,11 +115,14 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
private static final CatalogTableColumnDescriptor pkCol =
new CatalogTableColumnDescriptor("pkCol", ColumnType.STRING,
false, 0, 0, 10, null);
+ private final ArrayDeque<Runnable> scheduledTasks = new ArrayDeque<>();
+
@Test
public void checkDefaultTableSize() {
int tableId = ThreadLocalRandom.current().nextInt();
// Preparing:
prepareEmptyCatalog();
+ prepareTaskScheduler();
when(statAggregator.estimatedSizeWithLastUpdate(List.of()))
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMaps.emptyMap()));
@@ -128,6 +130,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
+ runScheduledTasks();
+
// Test:
// return default value for unknown table.
assertEquals(DEFAULT_TABLE_SIZE,
sqlStatisticManager.tableSize(tableId));
@@ -139,6 +143,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
long tableSize = 999_888_777L;
// Preparing:
prepareCatalogWithTable(tableId);
+ prepareTaskScheduler();
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -149,11 +154,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
- long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
- Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(()
->
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
- );
+ runScheduledTasks();
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
// Second time we should obtain the same value from a cache.
assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
@@ -171,6 +173,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
HybridTimestamp time2 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
// Preparing:
prepareCatalogWithTable(tableId);
+ prepareTaskScheduler();
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -186,11 +189,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
- long timeout = 2 * UPDATE_INTERVAL_SECONDS;
+ runScheduledTasks();
- Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(()
->
- assertEquals(tableSize1,
sqlStatisticManager.tableSize(tableId))
- );
// The second time we should obtain the same value from a cache.
assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
@@ -249,9 +249,13 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(statAggregator.estimatedSizeWithLastUpdate(any()))
.thenReturn(CompletableFuture.completedFuture(modifications));
+ prepareTaskScheduler();
+
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
+ runScheduledTasks();
+
sqlStatisticManager.forceUpdateAll();
sqlStatisticManager.lastUpdateStatisticFuture().get(5_000,
TimeUnit.MILLISECONDS);
@@ -275,6 +279,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
long tableSize = 999_888_777L;
// Preparing:
prepareCatalogWithTable(tableId);
+ prepareTaskScheduler();
ArgumentCaptor<EventListener<DropTableEventParameters>>
tableDropCapture = ArgumentCaptor.forClass(EventListener.class);
doNothing().when(catalogManager).listen(eq(CatalogEvent.TABLE_DROP),
tableDropCapture.capture());
@@ -292,11 +297,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
- long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
- Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(()
->
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
- );
+ runScheduledTasks();
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
int catalogVersionBeforeDrop = 1;
int catalogVersionAfterDrop = 2;
@@ -338,6 +340,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
entry(tableId, new
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
+ prepareTaskScheduler();
+
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
@@ -356,12 +360,9 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
.storageProfile("")
.build();
tableCreateCapture.getValue().notify(new
CreateTableEventParameters(-1, 0, catalogDescriptor));
- // now table is created and size should be actual.
- long timeout = 2 * UPDATE_INTERVAL_SECONDS;
- Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(()
->
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
- );
+ runScheduledTasks();
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
}
@Test
@@ -387,6 +388,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
+ prepareTaskScheduler();
+
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
@@ -395,6 +398,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
// Error was handled after the first call.
+ runScheduledTasks();
// tableSize2
sqlStatisticManager.forceUpdateAll();
@@ -424,6 +428,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
entry(tableId, new
PartitionModificationInfo(tableSize2, time2.longValue()))))
);
+ prepareTaskScheduler();
+
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
@@ -436,55 +442,45 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
// get tableSize2
sqlStatisticManager.forceUpdateAll();
- long timeout = 2 * UPDATE_INTERVAL_SECONDS;
-
- Awaitility.await().timeout(timeout, TimeUnit.SECONDS).untilAsserted(()
->
- assertEquals(tableSize2,
sqlStatisticManager.tableSize(tableId))
- );
+ runScheduledTasks();
+ assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
// Stale result must be discarded.
assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
}
@Test
- public void rescheduleWhenUpdateIntervalChanges() {
+ public void updateSchedulingInterval() {
prepareEmptyCatalog();
+ prepareTaskScheduler();
when(statAggregator.estimatedSizeWithLastUpdate(List.of()))
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMaps.emptyMap()));
+ ConfigurationValue<Integer> configurationValue =
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
+ configurationValue.update(UPDATE_INTERVAL_SECONDS).join();
+
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager();
sqlStatisticManager.start();
- // Increase interval - reschedule
- {
- ScheduledFuture<?> f =
sqlStatisticManager.currentScheduledFuture();
-
- ConfigurationValue<Integer> interval =
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
- interval.update(UPDATE_INTERVAL_SECONDS + 1).join();
- assertNotSame(f, sqlStatisticManager.currentScheduledFuture());
- }
-
- // Decrease the interval - reschedule
- {
- ScheduledFuture<?> f =
sqlStatisticManager.currentScheduledFuture();
-
- ConfigurationValue<Integer> interval =
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
- interval.update(UPDATE_INTERVAL_SECONDS - 1).join();
-
- assertNotSame(f, sqlStatisticManager.currentScheduledFuture());
- }
-
- // Do not reschedule
- {
- ScheduledFuture<?> f =
sqlStatisticManager.currentScheduledFuture();
-
- ConfigurationValue<Integer> interval =
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds();
- int currentValue = interval.value().intValue();
- interval.update(currentValue).join();
-
- assertSame(f, sqlStatisticManager.currentScheduledFuture());
- }
+ configurationValue.update(UPDATE_INTERVAL_SECONDS + 1).join();
+ configurationValue.update(UPDATE_INTERVAL_SECONDS - 1).join();
+ // Same value no effect.
+ configurationValue.update(UPDATE_INTERVAL_SECONDS - 1).join();
+
+ sqlStatisticManager.stop();
+
+ InOrder inOrder = Mockito.inOrder(scheduledExecutorService,
taskFuture);
+ // Start
+
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
eq(0L), eq(5L), eq(TimeUnit.SECONDS));
+ // Update 1
+ inOrder.verify(taskFuture).cancel(anyBoolean());
+
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
eq(6L), eq(6L), eq(TimeUnit.SECONDS));
+ // Update 2
+ inOrder.verify(taskFuture).cancel(anyBoolean());
+
inOrder.verify(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
eq(4L), eq(4L), eq(TimeUnit.SECONDS));
+ // Stopping cancels the task.
+ inOrder.verify(taskFuture).cancel(anyBoolean());
}
@Test
@@ -493,6 +489,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
long tableSize = 1_000L;
// Preparing:
prepareCatalogWithTable(tableId);
+ prepareTaskScheduler();
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
@@ -512,17 +509,12 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
SqlStatisticManagerImpl sqlStatisticManager =
newSqlStatisticsManager(intervalSeconds);
sqlStatisticManager.start();
- ScheduledFuture<?> f = sqlStatisticManager.currentScheduledFuture();
- // An exception does not terminate the task.
- assertThrows(TimeoutException.class, () -> f.get(intervalSeconds * 2,
TimeUnit.SECONDS));
+ // The first run fails
+ runScheduledTasks();
+ assertEquals(1, sqlStatisticManager.tableSize(tableId));
- // The statistics will eventually refresh
- Awaitility.await()
- .pollInterval(intervalSeconds / 2, TimeUnit.SECONDS)
- .untilAsserted(() -> assertEquals(tableSize,
sqlStatisticManager.tableSize(tableId)));
-
- // The task is still running
- assertFalse(f.isDone());
+ runScheduledTasks();
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
}
private SqlStatisticManagerImpl newSqlStatisticsManager() {
@@ -537,7 +529,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
tableManager,
catalogManager,
lowWatermark,
- commonExecutor,
+ scheduledExecutorService,
statAggregator,
checkInterval
);
@@ -568,4 +560,23 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(catalogManager.catalog(1)).thenReturn(catalog);
when(catalog.tables()).thenReturn(List.of());
}
+
+ private void prepareTaskScheduler() {
+ doAnswer(invocation -> {
+ Runnable r = invocation.getArgument(0);
+ scheduledTasks.add(r);
+ return taskFuture;
+
}).when(scheduledExecutorService).scheduleAtFixedRate(any(Runnable.class),
anyLong(), anyLong(), any(TimeUnit.class));
+
+ doAnswer(invocation -> {
+ scheduledTasks.poll();
+ return true;
+ }).when(taskFuture).cancel(anyBoolean());
+ }
+
+ private void runScheduledTasks() {
+ for (Runnable r : scheduledTasks) {
+ r.run();
+ }
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index 31c926f73b5..7e5291892e1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
import org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl;
+import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
@@ -48,24 +49,28 @@ import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParam
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mock.Strictness;
+import org.mockito.junit.jupiter.MockitoExtension;
/**
* Test planning cache metrics.
*/
-@ExtendWith(ExecutorServiceExtension.class)
+@ExtendWith(MockitoExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class PlanningCacheMetricsTest extends AbstractPlannerTest {
private final MetricManager metricManager = new MetricManagerImpl();
- @InjectExecutorService
+ @Mock(strictness = Strictness.LENIENT)
private ScheduledExecutorService commonExecutor;
+ @Mock
+ private DdlSqlToCommandConverter ddlCommandConverter;
+
@InjectConfiguration("mock.autoRefresh.staleRowsCheckIntervalSeconds=5")
private StatisticsConfiguration statisticsConfiguration;
@@ -90,8 +95,8 @@ public class PlanningCacheMetricsTest extends
AbstractPlannerTest {
AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
PrepareService prepareService = new PrepareServiceImpl(
- "test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE,
metricManager, new PredefinedSchemaManager(schema),
- clockService::currentLong, commonExecutor, producer,
+ "test", 2, cacheFactory, ddlCommandConverter, 15_000L, 2,
Integer.MAX_VALUE, metricManager,
+ new PredefinedSchemaManager(schema),
clockService::currentLong, commonExecutor, producer,
statisticsConfiguration.autoRefresh().staleRowsCheckIntervalSeconds()
);