This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0759c8d58a3 IGNITE-26069 Sql. Add data staleness check to statistic
manager (#6649)
0759c8d58a3 is described below
commit 0759c8d58a392cd8172d440ef8356babad280688
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Fri Nov 14 08:17:35 2025 +0300
IGNITE-26069 Sql. Add data staleness check to statistic manager (#6649)
---
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
.../internal/sql/engine/ItAggregatesTest.java | 5 +-
.../sql/engine/statistic/ItStatisticTest.java | 218 +++++++++++++-----
.../internal/sql/engine/SqlQueryProcessor.java | 6 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 144 ++++++------
...Manager.java => PartitionModificationInfo.java} | 22 +-
.../engine/statistic/SqlStatisticManagerImpl.java | 216 ++++++++++--------
.../statistic/SqlStatisticUpdateManager.java | 6 +-
...datesNotifier.java => StatisticAggregator.java} | 8 +-
.../engine/statistic/StatisticAggregatorImpl.java | 198 +++++++++++++++++
.../StatisticChangedEvent.java} | 12 +-
.../StatisticEventParameters.java} | 17 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 8 +-
.../sql/engine/framework/TestBuilders.java | 8 +-
.../sql/engine/planner/PlannerTimeoutTest.java | 7 +-
.../sql/engine/prepare/PrepareServiceImplTest.java | 31 ++-
.../statistic/SqlStatisticManagerImplTest.java | 246 +++++++++++++--------
.../sql/metrics/PlanningCacheMetricsTest.java | 7 +-
modules/table/build.gradle | 1 +
.../distributed/PartitionModificationCounter.java | 15 +-
.../PartitionModificationCounterFactory.java | 71 +++++-
.../internal/table/distributed/TableManager.java | 7 +-
...GetEstimatedSizeWithLastModifiedTsRequest.java} | 13 +-
...etEstimatedSizeWithLastModifiedTsResponse.java} | 12 +-
.../message/PartitionModificationInfoMessage.java} | 21 +-
.../internal/table/message/TableMessageGroup.java} | 22 +-
.../PartitionModificationCounterTest.java | 12 +-
.../ignite/internal/table/TableTestUtils.java | 9 +-
28 files changed, 977 insertions(+), 372 deletions(-)
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0ac1010a265..13ee855702c 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -513,6 +513,8 @@ public class IgniteImpl implements Ignite {
private final ClockServiceMetricSource clockServiceMetricSource;
+ private final PartitionModificationCounterFactory
partitionModificationCounterFactory;
+
/**
* The Constructor.
*
@@ -1115,8 +1117,7 @@ public class IgniteImpl implements Ignite {
metricManager
);
- PartitionModificationCounterFactory
partitionModificationCounterFactory =
- new PartitionModificationCounterFactory(clockService::current);
+ partitionModificationCounterFactory = new
PartitionModificationCounterFactory(clockService::current,
clusterSvc.messagingService());
distributedTblMgr = new TableManager(
name,
@@ -1489,6 +1490,8 @@ public class IgniteImpl implements Ignite {
metricManager.registerSource(clockServiceMetricSource);
metricManager.enable(clockServiceMetricSource);
+ partitionModificationCounterFactory.start();
+
// Start the components that are required to join the cluster.
// TODO https://issues.apache.org/jira/browse/IGNITE-22570
CompletableFuture<Void> componentsStartFuture =
lifecycleManager.startComponentsAsync(
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index c5f9d9f11f4..a939c769ca2 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -542,7 +542,10 @@ public class ItAggregatesTest extends
BaseSqlIntegrationTest {
public void testGroupingFunction(String[] rules) {
sql("DELETE FROM test_str_int_real_dec");
- sql("DELETE FROM test_str_int_real_dec");
+ Assumptions.assumeTrue(
+ Arrays.stream(rules).noneMatch(rule ->
rule.contains("Colocated")),
+ "https://issues.apache.org/jira/browse/IGNITE-26512"
+ );
sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(1, 's1', 10)");
sql("INSERT INTO test_str_int_real_dec(id, str_col, int_col) VALUES
(2, 's1', 20)");
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
index 081e1abebee..275bbb8457c 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/statistic/ItStatisticTest.java
@@ -17,47 +17,129 @@
package org.apache.ignite.internal.sql.engine.statistic;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_STALE_ROWS_FRACTION;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY;
+import static
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_REFRESH_PERIOD;
+import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.INITIAL_DELAY;
+import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.REFRESH_PERIOD;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.nodeRowCount;
import static org.hamcrest.Matchers.is;
-import java.util.concurrent.ExecutionException;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.util.HashCalculator;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/** Integration test to check SQL statistics. */
public class ItStatisticTest extends BaseSqlIntegrationTest {
private SqlStatisticManagerImpl sqlStatisticManager;
-
- private static final AtomicInteger counter = new AtomicInteger(0);
+ private static final int PARTITIONS = 3;
@BeforeAll
void beforeAll() {
sqlStatisticManager = (SqlStatisticManagerImpl)
queryProcessor().sqlStatisticManager();
- sql("CREATE TABLE t(ID INTEGER PRIMARY KEY, VAL INTEGER)");
+
+ sql(format("CREATE ZONE zone_with_repl (replicas 2, partitions {})
storage profiles ['"
+ + DEFAULT_STORAGE_PROFILE + "']", PARTITIONS));
+ sql("CREATE TABLE t(ID BIGINT PRIMARY KEY, VAL INTEGER) ZONE
zone_with_repl");
}
@AfterAll
void afterAll() {
- sql("DROP TABLE IF EXISTS t");
+ sql("DROP TABLE IF EXISTS t;");
+ }
+
+ @AfterEach
+ void tearDown() {
+ sql("DELETE FROM t;");
+ }
+
+ /** Simple case demonstrating that the tables size is being updated during
statistic refresh interval. */
+ @Test
+ public void testTableSizeUpdates() {
+ long milestone1 = computeNextMilestone(0, DEFAULT_STALE_ROWS_FRACTION,
DEFAULT_MIN_STALE_ROWS_COUNT);
+
+ String selectQuery = "select * from t";
+
+ long update = insert(0, milestone1);
+
+ sql(selectQuery);
+
+ AtomicInteger inc = new AtomicInteger();
+
+ long timeout = calcStatisticUpdateTimeout();
+ // max 10 times cache pollution
+ long pollInterval = timeout / 10;
+
+ Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+ .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertQuery(format("select {} from t",
inc.incrementAndGet()))
+ .matches(nodeRowCount("TableScan", is((int)
update)))
+ .check()
+ );
+ }
+
+ @Test
+ public void testTableSizeUpdatesForcibly() {
+ long milestone = computeNextMilestone(0, DEFAULT_STALE_ROWS_FRACTION,
DEFAULT_MIN_STALE_ROWS_COUNT);
+
+ long updates1 = insert(0L, milestone);
+
+ long timeout = calcStatisticUpdateTimeout();
+
+ // max 10 times cache pollution
+ long pollInterval = timeout / 10;
+
+ Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+ .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+ sqlStatisticManager.forceUpdateAll();
+ sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+ assertQuery("select 1 from t")
+ .matches(nodeRowCount("TableScan", is((int)
updates1)))
+ .check();
+ }
+ );
+
+ milestone = computeNextMilestone(milestone,
DEFAULT_STALE_ROWS_FRACTION, DEFAULT_MIN_STALE_ROWS_COUNT);
+
+ long updates2 = insert(updates1, milestone);
+
+ sqlStatisticManager.forceUpdateAll();
+ sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+ // query not cached in plans
+ Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+ .timeout(timeout, TimeUnit.MILLISECONDS).untilAsserted(() -> {
+ sqlStatisticManager.forceUpdateAll();
+ sqlStatisticManager.lastUpdateStatisticFuture().join();
+
+ assertQuery("select 1 from t")
+ .matches(nodeRowCount("TableScan", is((int)
updates2)))
+ .check();
+ }
+ );
}
@Test
public void statisticUpdatesChangeQueryPlans() throws Exception {
try {
-
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
-
sqlScript(""
- + "CREATE TABLE j1(ID INTEGER PRIMARY KEY, VAL INTEGER);"
- + "CREATE TABLE j2(ID INTEGER PRIMARY KEY, VAL INTEGER);"
+ + "CREATE TABLE j1(ID INTEGER PRIMARY KEY, VAL INTEGER)
ZONE zone_with_repl;"
+ + "CREATE TABLE j2(ID INTEGER PRIMARY KEY, VAL INTEGER)
ZONE zone_with_repl;"
);
+
sql("INSERT INTO j1 SELECT x, x FROM system_range(?, ?)", 0, 10);
sqlStatisticManager.forceUpdateAll();
@@ -66,22 +148,31 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
String query = "SELECT /*+ DISABLE_RULE('HashJoinConverter',
'MergeJoinConverter', 'CorrelatedNestedLoopJoin') */ "
+ "j1.* FROM j2, j1 WHERE j2.id = j1.id";
- assertQuery(query)
- // expecting right source has less rows than left
-
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
- .returnNothing()
- .check();
+ long statRefresh = calcStatisticUpdateTimeout();
- sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 100);
+ // max 10 times cache pollution
+ long pollInterval = statRefresh / 10;
+
+ Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+ .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertQuery(query)
+ // expecting right source has less rows
than left
+
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J1.*TableScan.*PUBLIC.J2.*"))
+ .returnNothing()
+ .check()
+ );
+
+ sql("INSERT INTO j2 SELECT x, x FROM system_range(?, ?)", 0, 3 *
DEFAULT_MIN_STALE_ROWS_COUNT);
sqlStatisticManager.forceUpdateAll();
sqlStatisticManager.lastUpdateStatisticFuture().get(5,
TimeUnit.SECONDS);
- Awaitility.await().timeout(Math.max(10_000, 2 *
PLAN_UPDATER_INITIAL_DELAY), TimeUnit.MILLISECONDS).untilAsserted(() ->
- assertQuery(query)
- // expecting right source has less rows than left
-
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
- .check()
+ Awaitility.await().pollInterval(Duration.ofMillis(pollInterval))
+ .timeout(statRefresh,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertQuery(query)
+ // expecting right source has less rows
than left
+
.matches(QueryChecker.matches(".*TableScan.*PUBLIC.J2.*TableScan.*PUBLIC.J1.*"))
+ .check()
);
} finally {
sqlScript(""
@@ -90,46 +181,59 @@ public class ItStatisticTest extends
BaseSqlIntegrationTest {
}
}
- @Test
- public void testStatisticsRowCount() throws Exception {
- // For test we should always update statistics.
- long prevValueOfThreshold =
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
- try {
- insertAndUpdateRunQuery(500);
- assertQuery(getUniqueQuery())
- .matches(nodeRowCount("TableScan", is(500)))
- .check();
-
- insertAndUpdateRunQuery(600);
- assertQuery(getUniqueQuery())
- .matches(nodeRowCount("TableScan", is(1100)))
- .check();
-
-
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
- insertAndUpdateRunQuery(900);
-
- // Statistics shouldn't be updated despite we inserted new rows.
- assertQuery(getUniqueQuery())
- .matches(nodeRowCount("TableScan", is(1100)))
- .check();
- } finally {
-
sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(prevValueOfThreshold);
- }
+ private static long calcStatisticUpdateTimeout() {
+ long inc = TimeUnit.SECONDS.toMillis(2);
+ // need to wait at least 2 statistic updates.
+ long statisticAggregationTimeout = INITIAL_DELAY + 2 * REFRESH_PERIOD;
+
+ return PLAN_UPDATER_INITIAL_DELAY > statisticAggregationTimeout
+ ? PLAN_UPDATER_INITIAL_DELAY + PLAN_UPDATER_REFRESH_PERIOD
+ // need to wait at least one planner cache re-calculation step
with a bit timeout for it.
+ : statisticAggregationTimeout + PLAN_UPDATER_REFRESH_PERIOD +
inc;
}
- private void insertAndUpdateRunQuery(int numberOfRecords) throws
ExecutionException, TimeoutException, InterruptedException {
- int start = counter.get();
- int end = counter.addAndGet(numberOfRecords) - 1;
- sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", start, end);
+ // copy-paste from private method:
PartitionModificationCounter#computeNextMilestone
+ // if implementation will changes, it need to be changed too
+ private static long computeNextMilestone(
+ long currentSize,
+ double staleRowsFraction,
+ long minStaleRowsCount
+ ) {
+ return Math.max((long) (currentSize * staleRowsFraction),
minStaleRowsCount);
+ }
- // run unique sql to update statistics
- sql(getUniqueQuery());
+ /**
+ * Calculates number of rows need to be inserted with guarantee that
{@code insertsPerPartition} will be reached for every partition.
+ * Inclusively 'from', exclusively 'to' bounds.
+ */
+ private static long insert(long from, long insertsPerPartition) {
+ long numberOfInsertions = 0;
+
+ long[] partitionUpdates = new long[PARTITIONS];
+
+ HashCalculator calc = new HashCalculator();
+
+ for (long i = from; i < Integer.MAX_VALUE; ++i) {
+ calc.appendLong(i);
+ int partition = IgniteUtils.safeAbs(calc.hash()) % PARTITIONS;
+ partitionUpdates[partition] += 1;
+ calc.reset();
+ numberOfInsertions = i;
+ boolean filled = true;
+ for (int pos = 0; pos < PARTITIONS; ++pos) {
+ if (partitionUpdates[pos] < insertsPerPartition) {
+ filled = false;
+ break;
+ }
+ }
+
+ if (filled) {
+ break;
+ }
+ }
- // wait to update statistics
- sqlStatisticManager.lastUpdateStatisticFuture().get(5,
TimeUnit.SECONDS);
- }
+ sql("INSERT INTO t SELECT x, x FROM system_range(?, ?)", from,
numberOfInsertions);
- private static String getUniqueQuery() {
- return "SELECT " + counter.incrementAndGet() + " FROM t";
+ return numberOfInsertions + 1;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b97a597af86..ddfd4e5f9dd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl;
import
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticUpdateManager;
+import org.apache.ignite.internal.sql.engine.statistic.StatisticAggregatorImpl;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -253,7 +254,10 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
this.killCommandHandler = killCommandHandler;
this.eventLog = eventLog;
- sqlStatisticManager = new SqlStatisticManagerImpl(tableManager,
catalogManager, lowWaterMark);
+ StatisticAggregatorImpl statAggregator =
+ new StatisticAggregatorImpl(() ->
logicalTopologyService.localLogicalTopology().nodes(),
+ clusterSrvc.messagingService());
+ sqlStatisticManager = new SqlStatisticManagerImpl(tableManager,
catalogManager, lowWaterMark, commonScheduler, statAggregator);
sqlSchemaManager = new SqlSchemaManagerImpl(
catalogManager,
sqlStatisticManager,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 83942549bc3..aecd0463137 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.sql.engine.prepare;
import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
import static
org.apache.ignite.internal.sql.engine.prepare.CacheKey.EMPTY_CLASS_ARRAY;
import static
org.apache.ignite.internal.sql.engine.prepare.PlannerHelper.optimize;
+import static
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent.STATISTIC_CHANGED;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static
org.apache.ignite.internal.sql.engine.util.Commons.fastQueryOptimizationEnabled;
import static org.apache.ignite.internal.sql.engine.util.TypeUtils.columnType;
import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
@@ -64,6 +66,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.event.EventProducer;
import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -95,7 +98,8 @@ import
org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplain;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplainMode;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlKill;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
-import
org.apache.ignite.internal.sql.engine.statistic.StatisticUpdatesNotifier;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.util.Cloner;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
@@ -138,7 +142,8 @@ public class PrepareServiceImpl implements PrepareService {
private static final String PLANNING_EXECUTOR_SOURCE_NAME =
THREAD_POOLS_METRICS_SOURCE_NAME + "sql-planning-executor";
- public static final int PLAN_UPDATER_INITIAL_DELAY = 2_000;
+ public static final int PLAN_UPDATER_INITIAL_DELAY = 5_000;
+ public static final int PLAN_UPDATER_REFRESH_PERIOD = 5_000;
private final UUID prepareServiceId = UUID.randomUUID();
private final AtomicLong planIdGen = new AtomicLong();
@@ -165,6 +170,8 @@ public class PrepareServiceImpl implements PrepareService {
private final LongSupplier currentClock;
+ private final EventProducer<StatisticChangedEvent,
StatisticEventParameters> statUpdates;
+
/**
* Factory method.
*
@@ -178,7 +185,7 @@ public class PrepareServiceImpl implements PrepareService {
* @param ddlSqlToCommandConverter Converter from SQL DDL operators to
catalog commands.
* @param currentClock Actual clock supplier.
* @param scheduler Scheduler.
- * @param updNotifier Updates notifier call-back.
+ * @param statUpdates Statistic updates notifier.
*/
public static PrepareServiceImpl create(
String nodeName,
@@ -191,9 +198,9 @@ public class PrepareServiceImpl implements PrepareService {
DdlSqlToCommandConverter ddlSqlToCommandConverter,
LongSupplier currentClock,
ScheduledExecutorService scheduler,
- StatisticUpdatesNotifier updNotifier
+ EventProducer<StatisticChangedEvent, StatisticEventParameters>
statUpdates
) {
- PrepareServiceImpl impl = new PrepareServiceImpl(
+ return new PrepareServiceImpl(
nodeName,
clusterCfg.planner().estimatedNumberOfQueries().value(),
cacheFactory,
@@ -204,12 +211,9 @@ public class PrepareServiceImpl implements PrepareService {
metricManager,
schemaManager,
currentClock,
- scheduler
+ scheduler,
+ statUpdates
);
-
- updNotifier.changesNotifier(impl::statisticsChanged);
-
- return impl;
}
/**
@@ -224,6 +228,7 @@ public class PrepareServiceImpl implements PrepareService {
* @param schemaManager Schema manager to use on validation phase to bind
identifiers in AST with particular schema objects.
* @param currentClock Actual clock supplier.
* @param scheduler Scheduler.
+ * @param statUpdates Statistic updates notifier.
*/
public PrepareServiceImpl(
String nodeName,
@@ -236,7 +241,8 @@ public class PrepareServiceImpl implements PrepareService {
MetricManager metricManager,
SqlSchemaManager schemaManager,
LongSupplier currentClock,
- ScheduledExecutorService scheduler
+ ScheduledExecutorService scheduler,
+ EventProducer<StatisticChangedEvent, StatisticEventParameters>
statUpdates
) {
this.nodeName = nodeName;
this.ddlConverter = ddlConverter;
@@ -244,6 +250,7 @@ public class PrepareServiceImpl implements PrepareService {
this.metricManager = metricManager;
this.plannerThreadCount = plannerThreadCount;
this.schemaManager = schemaManager;
+ this.statUpdates = statUpdates;
this.currentClock = currentClock;
@@ -276,6 +283,12 @@ public class PrepareServiceImpl implements PrepareService {
IgnitePlanner.warmup();
+ statUpdates.listen(STATISTIC_CHANGED, parameters -> {
+ statisticsChanged(parameters.tableId());
+
+ return falseCompletedFuture();
+ });
+
planUpdater.start();
}
@@ -1065,8 +1078,6 @@ public class PrepareServiceImpl implements PrepareService
{
private final AtomicBoolean inProgress = new AtomicBoolean();
- private volatile boolean recalculatePlans;
-
private final Cache<CacheKey, CompletableFuture<PlanInfo>> cache;
private final long plannerTimeout;
@@ -1077,6 +1088,8 @@ public class PrepareServiceImpl implements PrepareService
{
private final BiFunction<Integer, String, SchemaPlus>
defaultSchemaFunc;
+ private final Set<Integer> statPerTableChanges = new IntOpenHashSet();
+
PlanUpdater(
ScheduledExecutorService planUpdater,
Cache<CacheKey, CompletableFuture<PlanInfo>> cache,
@@ -1099,35 +1112,12 @@ public class PrepareServiceImpl implements
PrepareService {
* @param tableId Table Id statistic changed for.
*/
void statisticsChanged(int tableId) {
- Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedEntries =
cache.entrySet();
-
- int currentCatalogVersion = catalogVersionSupplier.getAsInt();
-
- boolean statChanged = false;
-
- for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cachedEntries) {
- CacheKey key = ent.getKey();
- CompletableFuture<PlanInfo> fut = ent.getValue();
-
- if (currentCatalogVersion == key.catalogVersion() &&
isCompletedSuccessfully(fut)) {
- // no wait, already completed
- PlanInfo info = fut.join();
-
- if (info.sources.contains(tableId)) {
- info.invalidate();
- statChanged = true;
- }
- }
- }
-
- if (statChanged) {
- recalculatePlans = true;
- }
+ statPerTableChanges.add(tableId);
}
void start() {
planUpdater.scheduleAtFixedRate(() -> {
- if (!recalculatePlans) {
+ if (statPerTableChanges.isEmpty()) {
return;
}
@@ -1135,54 +1125,72 @@ public class PrepareServiceImpl implements
PrepareService {
return;
}
- CompletableFuture<Void> rePlanningFut = nullCompletedFuture();
-
- while (recalculatePlans) {
- recalculatePlans = false;
-
- int currentCatalogVersion =
catalogVersionSupplier.getAsInt();
+ for (int tableId : statPerTableChanges) {
+ Set<Entry<CacheKey, CompletableFuture<PlanInfo>>>
cachedEntries = cache.entrySet();
- for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cache.entrySet()) {
+ for (Map.Entry<CacheKey, CompletableFuture<PlanInfo>> ent
: cachedEntries) {
CacheKey key = ent.getKey();
- CompletableFuture<PlanInfo> fut = cache.get(key);
+ CompletableFuture<PlanInfo> fut = ent.getValue();
+ int currentCatalogVersion =
catalogVersionSupplier.getAsInt();
- // can be evicted
- if (fut != null && isCompletedSuccessfully(fut)) {
+ if (currentCatalogVersion == key.catalogVersion() &&
isCompletedSuccessfully(fut)) {
+ // no wait, already completed
PlanInfo info = fut.join();
- if (!info.needInvalidate()) {
- continue;
+ if (info.sources.contains(tableId)) {
+ info.invalidate();
}
+ }
+ }
- assert info.statement != null;
+ // all involved entries are processed
+ statPerTableChanges.remove(tableId);
+ }
- if (currentCatalogVersion == key.catalogVersion())
{
- SqlQueryType queryType =
info.statement.parsedResult().queryType();
+ CompletableFuture<Void> rePlanningFut = nullCompletedFuture();
- SchemaPlus defaultSchema =
defaultSchemaFunc.apply(key.catalogVersion(), key.schemaName());
+ int currentCatalogVersion = catalogVersionSupplier.getAsInt();
- PlanningContext planningContext =
PlanningContext.builder()
-
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
-
.defaultSchema(defaultSchema).build())
-
.query(info.statement.parsedResult().originalQuery())
- .plannerTimeout(plannerTimeout)
- .catalogVersion(key.catalogVersion())
- .defaultSchemaName(key.schemaName())
-
.parameters(Commons.arrayToMap(key.paramTypes()))
- .build();
+ for (Entry<CacheKey, CompletableFuture<PlanInfo>> ent :
cache.entrySet()) {
+ CacheKey key = ent.getKey();
+ CompletableFuture<PlanInfo> fut = cache.get(key);
- CompletableFuture<Void> newPlanFut =
- prepare.recalculatePlan(queryType,
info.statement.parsedResult, planningContext, key);
+ // can be evicted
+ if (fut != null && isCompletedSuccessfully(fut)) {
+ PlanInfo info = fut.join();
- rePlanningFut.thenCompose(v -> newPlanFut);
- }
+ if (!info.needInvalidate()) {
+ continue;
+ }
+
+ assert info.statement != null;
+
+ if (currentCatalogVersion == key.catalogVersion()) {
+ SqlQueryType queryType =
info.statement.parsedResult().queryType();
+
+ SchemaPlus defaultSchema =
defaultSchemaFunc.apply(key.catalogVersion(), key.schemaName());
+
+ PlanningContext planningContext =
PlanningContext.builder()
+
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+
.defaultSchema(defaultSchema).build())
+
.query(info.statement.parsedResult().originalQuery())
+ .plannerTimeout(plannerTimeout)
+ .catalogVersion(key.catalogVersion())
+ .defaultSchemaName(key.schemaName())
+
.parameters(Commons.arrayToMap(key.paramTypes()))
+ .build();
+
+ CompletableFuture<Void> newPlanFut =
+ prepare.recalculatePlan(queryType,
info.statement.parsedResult, planningContext, key);
+
+ rePlanningFut.thenCompose(v -> newPlanFut);
}
}
}
rePlanningFut.whenComplete((k, err) -> inProgress.set(false));
- }, PLAN_UPDATER_INITIAL_DELAY, 1_000, TimeUnit.MILLISECONDS);
+ }, PLAN_UPDATER_INITIAL_DELAY, PLAN_UPDATER_REFRESH_PERIOD,
TimeUnit.MILLISECONDS);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
similarity index 57%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
index 997cbe7d606..f39104d5b8a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/PartitionModificationInfo.java
@@ -17,6 +17,24 @@
package org.apache.ignite.internal.sql.engine.statistic;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+/** Partition modification information holder. */
+public class PartitionModificationInfo {
+ private final long estimatedSize;
+ private final long lastModificationCounter;
+
+ /** Constructor. */
+ public PartitionModificationInfo(long estimatedSize, long
lastModificationCounter) {
+ this.estimatedSize = estimatedSize;
+ this.lastModificationCounter = lastModificationCounter;
+ }
+
+ /** Returns last modification representation. */
+ long lastModificationCounter() {
+ return lastModificationCounter;
+ }
+
+ /** Returns estimated size. */
+ long getEstimatedSize() {
+ return estimatedSize;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 7ac42780636..ac6a8ad3644 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -18,14 +18,21 @@
package org.apache.ignite.internal.sql.engine.statistic;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent.STATISTIC_CHANGED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.catalog.CatalogService;
@@ -33,25 +40,29 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.util.FastTimestamps;
import org.jetbrains.annotations.TestOnly;
/**
* Statistic manager. Provide and manage update of statistics for SQL.
*/
-public class SqlStatisticManagerImpl implements SqlStatisticUpdateManager {
+public class SqlStatisticManagerImpl extends
AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
+ implements SqlStatisticUpdateManager {
private static final IgniteLogger LOG =
Loggers.forClass(SqlStatisticManagerImpl.class);
static final long DEFAULT_TABLE_SIZE = 1L;
- private static final ActualSize DEFAULT_VALUE = new
ActualSize(DEFAULT_TABLE_SIZE, 0L);
+ private static final ActualSize DEFAULT_VALUE = new
ActualSize(DEFAULT_TABLE_SIZE, Long.MIN_VALUE);
private final EventListener<ChangeLowWatermarkEventParameters> lwmListener
= fromConsumer(this::onLwmChanged);
private final EventListener<DropTableEventParameters>
dropTableEventListener = fromConsumer(this::onTableDrop);
@@ -66,29 +77,35 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
private final CatalogService catalogService;
private final LowWatermark lowWatermark;
- private final AtomicReference<StatisticUpdatesSupplier> changesSupplier =
new AtomicReference<>();
-
/* Contains all known table id's with statistics. */
- private final ConcurrentMap<Integer, ActualSize> tableSizeMap = new
ConcurrentHashMap<>();
+ final ConcurrentMap<Integer, ActualSize> tableSizeMap = new
ConcurrentHashMap<>();
+
+ /* Contain dropped tables, can`t update statistic for such a case. */
+ Set<Integer> droppedTables = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+ private final ScheduledExecutorService scheduler;
+ private final StatisticAggregator<Collection<InternalTable>,
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> statSupplier;
- private volatile long thresholdTimeToPostponeUpdateMs =
TimeUnit.MINUTES.toMillis(1);
+ static final long INITIAL_DELAY = 15_000;
+ static final long REFRESH_PERIOD = 15_000;
/** Constructor. */
- public SqlStatisticManagerImpl(TableManager tableManager, CatalogService
catalogService, LowWatermark lowWatermark) {
+ public SqlStatisticManagerImpl(
+ TableManager tableManager,
+ CatalogService catalogService,
+ LowWatermark lowWatermark,
+ ScheduledExecutorService scheduler,
+ StatisticAggregator<Collection<InternalTable>,
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> statSupplier
+ ) {
this.tableManager = tableManager;
this.catalogService = catalogService;
this.lowWatermark = lowWatermark;
- }
-
- @Override
- public void changesNotifier(StatisticUpdatesSupplier updater) {
- if (!this.changesSupplier.compareAndSet(null, updater)) {
- throw new AssertionError("Statistics updater unexpected change");
- }
+ this.scheduler = scheduler;
+ this.statSupplier = statSupplier;
}
/**
- * Returns approximate number of rows in table by their id.
+ * Returns approximate number of rows in table.
*
* <p>Returns the previous known value or {@value
SqlStatisticManagerImpl#DEFAULT_TABLE_SIZE} as default value. Can start process
to
* update asked statistics in background to have updated values for future
requests.
@@ -97,65 +114,9 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
*/
@Override
public long tableSize(int tableId) {
- updateTableSizeStatistics(tableId, false);
-
return tableSizeMap.getOrDefault(tableId, DEFAULT_VALUE).getSize();
}
- /** Update table size statistic in the background if it required. */
- private void updateTableSizeStatistics(int tableId, boolean force) {
- TableViewInternal tableView = tableManager.cachedTable(tableId);
- if (tableView == null) {
- LOG.debug("There is no table to update statistics [id={}].",
tableId);
- return;
- }
-
- ActualSize tableSize = tableSizeMap.get(tableId);
- if (tableSize == null) {
- // has been concurrently cleaned up, no need more update statistic
for the table.
- return;
- }
-
- long currTimestamp = FastTimestamps.coarseCurrentTimeMillis();
- long lastUpdateTime = tableSize.getTimestamp();
-
- if (force || lastUpdateTime <= currTimestamp -
thresholdTimeToPostponeUpdateMs) {
- // Prevent to run update for the same table twice concurrently.
- if (!force && !tableSizeMap.replace(tableId, tableSize, new
ActualSize(tableSize.getSize(), currTimestamp - 1))) {
- return;
- }
-
- // just request new table size in background.
- CompletableFuture<Void> updateResult =
tableView.internalTable().estimatedSize()
- .thenAccept(size -> {
- // the table can be concurrently dropped and we
shouldn't put new value in this case.
- tableSizeMap.computeIfPresent(tableId, (k, v) -> {
- // Discard current computation if value in cache
is newer than current one.
- if (v.timestamp >= currTimestamp) {
- return v;
- }
-
- return new ActualSize(Math.max(size, 1),
currTimestamp);
- });
- }).handle((res, err) -> {
- if (err != null) {
- LOG.warn(format("Can't calculate size for table
[id={}].", tableId), err);
-
- return null;
- } else {
- StatisticUpdatesSupplier supplier =
changesSupplier.get();
- if (supplier != null) {
- supplier.accept(tableId);
- }
-
- return res;
- }
- });
-
- latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult :
prev.thenCompose(none -> updateResult));
- }
- }
-
@Override
public void start() {
catalogService.listen(CatalogEvent.TABLE_CREATE,
createTableEventListener);
@@ -171,6 +132,62 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
tableSizeMap.putIfAbsent(table.id(), DEFAULT_VALUE);
}
}
+
+ scheduler.scheduleAtFixedRate(this::update, INITIAL_DELAY,
REFRESH_PERIOD, TimeUnit.MILLISECONDS);
+ }
+
+ private void update() {
+ if (!latestUpdateFut.get().isDone()) {
+ return;
+ }
+
+ Collection<InternalTable> tables = new
ArrayList<>(tableSizeMap.size());
+
+ for (Map.Entry<Integer, ActualSize> ent : tableSizeMap.entrySet()) {
+ Integer tableId = ent.getKey();
+
+ if (droppedTables.contains(tableId)) {
+ continue;
+ }
+
+ TableViewInternal tableView = tableManager.cachedTable(tableId);
+
+ if (tableView == null) {
+ LOG.debug("No table found to update statistics [id={}].",
ent.getKey());
+ } else {
+ tables.add(tableView.internalTable());
+ }
+ }
+
+ CompletableFuture<Void> updateResult =
statSupplier.estimatedSizeWithLastUpdate(tables)
+ .handle((infos, err) -> {
+ for (Int2ObjectMap.Entry<PartitionModificationInfo> ent :
infos.int2ObjectEntrySet()) {
+ int tableId = ent.getIntKey();
+ PartitionModificationInfo info = ent.getValue();
+
+ if (err != null) {
+ LOG.debug("Failed to update table statistics for
[tableId={}].", err, tableId);
+ } else {
+ ActualSize updatedSize = new
ActualSize(Math.max(info.getEstimatedSize(), DEFAULT_TABLE_SIZE),
+ info.lastModificationCounter());
+ ActualSize currentSize = tableSizeMap.get(tableId);
+ // the table can be concurrently dropped and we
shouldn't put new value in this case.
+ tableSizeMap.compute(tableId, (k, v) -> {
+ return v != null && v.modificationCounter() >
info.lastModificationCounter()
+ ? v
+ : updatedSize; // Save initial or
replace stale state.
+ });
+
+ if (updatedSize.modificationCounter() >=
currentSize.modificationCounter()) {
+ fireEvent(STATISTIC_CHANGED, new
StatisticEventParameters(tableId));
+ }
+ }
+ }
+
+ return null;
+ });
+
+ latestUpdateFut.updateAndGet(prev -> prev == null ? updateResult :
prev.thenCompose(none -> updateResult));
}
@Override
@@ -185,6 +202,7 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
int catalogVersion = parameters.catalogVersion();
destructionEventsQueue.enqueue(new DestroyTableEvent(catalogVersion,
tableId));
+ droppedTables.add(tableId);
}
private void onTableCreate(CreateTableEventParameters parameters) {
@@ -196,25 +214,46 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
List<DestroyTableEvent> events =
destructionEventsQueue.drainUpTo(earliestVersion);
events.forEach(event -> tableSizeMap.remove(event.tableId()));
+ events.forEach(event -> droppedTables.remove(event.tableId()));
}
- /** Timestamped size. */
- private static class ActualSize {
- long timestamp;
+ /** Size with modification counter. */
+ static class ActualSize {
+ long modificationCounter;
long size;
- ActualSize(long size, long timestamp) {
- this.timestamp = timestamp;
+ ActualSize(long size, long modificationCounter) {
+ this.modificationCounter = modificationCounter;
this.size = size;
}
- long getTimestamp() {
- return timestamp;
+ public long modificationCounter() {
+ return modificationCounter;
}
long getSize() {
return size;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ActualSize that = ((ActualSize) o);
+
+ return modificationCounter == that.modificationCounter && size ==
that.size;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(modificationCounter, size);
+ }
}
/** Internal event. */
@@ -236,17 +275,6 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
}
}
- /**
- * Set threshold time to postpone update statistics.
- */
- @TestOnly
- public long setThresholdTimeToPostponeUpdateMs(long milliseconds) {
- assert milliseconds >= 0;
- long prevValue = thresholdTimeToPostponeUpdateMs;
- thresholdTimeToPostponeUpdateMs = milliseconds;
- return prevValue;
- }
-
/**
* Returns feature for the last run update statistics to have ability wait
update statistics.
*/
@@ -258,10 +286,6 @@ public class SqlStatisticManagerImpl implements
SqlStatisticUpdateManager {
/** Forcibly updates statistics for all known tables, ignoring throttling.
*/
@TestOnly
public void forceUpdateAll() {
- List<Integer> tableIds = List.copyOf(tableSizeMap.keySet());
-
- for (int tableId : tableIds) {
- updateTableSizeStatistics(tableId, true);
- }
+ update();
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
index 997cbe7d606..a4ed27af630 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.sql.engine.statistic;
+import org.apache.ignite.internal.event.EventProducer;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
+
/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+public interface SqlStatisticUpdateManager extends SqlStatisticManager,
EventProducer<StatisticChangedEvent, StatisticEventParameters> {
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
similarity index 84%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
index c5459c939ad..5a72dab336c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticUpdatesNotifier.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregator.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.sql.engine.statistic;
-/** Statistic updates notifier. */
+/** Statistic aggregator. */
@FunctionalInterface
-public interface StatisticUpdatesNotifier {
- /** Changes callback. */
- void changesNotifier(StatisticUpdatesSupplier updater);
+public interface StatisticAggregator<T, R> {
+ /** Estimated size and last value update. */
+ R estimatedSizeWithLastUpdate(T t);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
new file mode 100644
index 00000000000..3bcfb3faecf
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/StatisticAggregatorImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.statistic;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.InternalTable;
+import
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsRequest;
+import
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsResponse;
+import
org.apache.ignite.internal.table.message.PartitionModificationInfoMessage;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
+import org.jetbrains.annotations.Nullable;
+
+/** Statistic aggregator. */
+public class StatisticAggregatorImpl implements
+ StatisticAggregator<Collection<InternalTable>,
CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>> {
+ private static final IgniteLogger LOG =
Loggers.forClass(StatisticAggregatorImpl.class);
+ private final Supplier<Set<LogicalNode>> clusterNodes;
+ private final MessagingService messagingService;
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
+ private static final long REQUEST_ESTIMATION_TIMEOUT_MILLIS =
TimeUnit.SECONDS.toMillis(5);
+ private final AtomicReference<@Nullable Map<TablePartitionId,
CompletableFuture<Object>>> requestsCompletion = new AtomicReference<>();
+
+ /** Constructor. */
+ public StatisticAggregatorImpl(
+ Supplier<Set<LogicalNode>> clusterNodes,
+ MessagingService messagingService
+ ) {
+ this.clusterNodes = clusterNodes;
+ this.messagingService = messagingService;
+
+ messagingService.addMessageHandler(TableMessageGroup.class,
this::handleMessage);
+ }
+
+ private void handleMessage(NetworkMessage message, InternalClusterNode
sender, @Nullable Long correlationId) {
+ Map<TablePartitionId, CompletableFuture<Object>> completedRequests =
requestsCompletion.get();
+
+ if (message instanceof GetEstimatedSizeWithLastModifiedTsResponse &&
completedRequests != null) {
+ GetEstimatedSizeWithLastModifiedTsResponse response =
(GetEstimatedSizeWithLastModifiedTsResponse) message;
+ for (PartitionModificationInfoMessage ent :
response.modifications()) {
+ TablePartitionId id = new TablePartitionId(ent.tableId(),
ent.partId());
+ long estSize = ent.estimatedSize();
+ long modificationCounter = ent.lastModificationCounter();
+
+ CompletableFuture<Object> responseFut =
completedRequests.get(id);
+
+ // stale response
+ if (responseFut == null) {
+ continue;
+ }
+
+ synchronized (this) {
+ if (isCompletedSuccessfully(responseFut)) {
+ PartitionModificationInfo res =
(PartitionModificationInfo) responseFut.join();
+ if (modificationCounter >
res.lastModificationCounter()) {
+ responseFut.complete(completedFuture(new
PartitionModificationInfo(estSize, modificationCounter)));
+ }
+ } else {
+ responseFut.complete(new
PartitionModificationInfo(estSize, modificationCounter));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns future with map<<em>last modification timestamp</em>,
<em>estimated size</em>> for input tables.
+ */
+ @Override
+ public CompletableFuture<Int2ObjectMap<PartitionModificationInfo>>
estimatedSizeWithLastUpdate(Collection<InternalTable> tables) {
+ // some requests are in progress
+ if (requestsCompletion.get() != null) {
+ return completedFuture(Int2ObjectMaps.emptyMap());
+ }
+
+ Collection<Integer> tablesId =
tables.stream().map(InternalTable::tableId).collect(Collectors.toList());
+
+ GetEstimatedSizeWithLastModifiedTsRequest request =
+
TABLE_MESSAGES_FACTORY.getEstimatedSizeWithLastModifiedTsRequest().tables(tablesId).build();
+
+ HashMap<TablePartitionId, CompletableFuture<Object>> partIdRequests =
new HashMap<>();
+ requestsCompletion.set(partIdRequests);
+
+ for (InternalTable t : tables) {
+ for (int p = 0; p < t.partitions(); ++p) {
+ partIdRequests.put(new TablePartitionId(t.tableId(), p),
+ new
CompletableFuture<>().orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
+ }
+ }
+
+ List<CompletableFuture<Void>> reqFutures = new ArrayList<>();
+
+ for (LogicalNode node : clusterNodes.get()) {
+ CompletableFuture<Void> reqFut = messagingService.send(node,
request);
+
+ reqFutures.add(reqFut.orTimeout(REQUEST_ESTIMATION_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
+ }
+
+ CompletableFuture<CompletableFuture<Void>>[] requests =
reqFutures.toArray(CompletableFuture[]::new);
+
+ CompletableFuture<Void> allRequests = allOf(requests);
+
+ Int2ObjectMap<PartitionModificationInfo> summary = new
Int2ObjectOpenHashMap<>();
+
+ for (InternalTable t : tables) {
+ Map<TablePartitionId, CompletableFuture<Object>> tableResponses =
new HashMap<>();
+ for (Map.Entry<TablePartitionId, CompletableFuture<Object>> ent :
partIdRequests.entrySet()) {
+ if (ent.getKey().tableId() == t.tableId()) {
+ tableResponses.put(ent.getKey(), ent.getValue());
+ }
+ }
+
+ allRequests = allRequests
+ .thenCompose(r ->
allOf(tableResponses.values().toArray(CompletableFuture[]::new)))
+ .handle((ret, ex) -> {
+ if (ex != null) {
+ LOG.debug("Can`t update statistics for table
[id={}].", ex, t.tableId());
+ }
+
+ CompletableFuture<Void> allResponses =
allOf(tableResponses.values().toArray(CompletableFuture[]::new));
+
+ if (!isCompletedSuccessfully(allResponses)) {
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<TablePartitionId,
CompletableFuture<Object>> ent : tableResponses.entrySet()) {
+ if
(isCompletedSuccessfully(ent.getValue())) {
+ LOG.debug("Can`t update statistics for
table partition [id={}].", ent.getKey());
+ }
+ }
+ }
+ return null;
+ }
+
+ for (Map.Entry<TablePartitionId,
CompletableFuture<Object>> ent : tableResponses.entrySet()) {
+ PartitionModificationInfo info =
(PartitionModificationInfo) ent.getValue().join();
+ long estSize = info.getEstimatedSize();
+ long modificationCounter =
info.lastModificationCounter();
+
+ summary.compute(ent.getKey().tableId(), (k, v) ->
v == null
+ ? new PartitionModificationInfo(estSize,
modificationCounter)
+ : new
PartitionModificationInfo(v.getEstimatedSize() + estSize,
+
Math.max(v.lastModificationCounter(), modificationCounter)));
+ }
+
+ return null;
+ });
+ }
+
+ return allRequests.handle((ret, ex) -> {
+ if (ex != null) {
+ LOG.debug("Exception during tables size estimation.", ex);
+ return Int2ObjectMaps.emptyMap();
+ }
+
+ requestsCompletion.set(null);
+
+ return summary;
+ });
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
similarity index 74%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
index 997cbe7d606..74cb6958ea8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticChangedEvent.java
@@ -15,8 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.sql.engine.statistic.event;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.event.Event;
+
+/** Statistic changed event. */
+public enum StatisticChangedEvent implements Event {
+ /**
+ * Fired when statistic is changed.
+ */
+ STATISTIC_CHANGED,
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
similarity index 66%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
index 997cbe7d606..63013148d38 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/event/StatisticEventParameters.java
@@ -15,8 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.sql.engine.statistic.event;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.event.EventParameters;
+
+/** Event related parameters. */
+public class StatisticEventParameters implements EventParameters {
+ private final int tableId;
+
+ public StatisticEventParameters(int tableId) {
+ this.tableId = tableId;
+ }
+
+ public int tableId() {
+ return tableId;
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 4112616c425..4738c1407af 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.hlc.ClockService;
@@ -140,6 +141,8 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -265,6 +268,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
+
prepareService = new PrepareServiceImpl(
"test",
0,
@@ -276,7 +281,8 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
metricManager,
new PredefinedSchemaManager(schema),
clockService::currentLong,
- commonExecutor
+ commonExecutor,
+ producer
);
parserService = new ParserServiceImpl();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 67184e438e2..c3db2244f1f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -75,6 +75,7 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
@@ -129,6 +130,8 @@ import
org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
import org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManager;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -687,6 +690,8 @@ public class TestBuilders {
IgniteThreadFactory.create("test",
"common-scheduled-executors", LOG)
);
+ AbstractEventProducer<StatisticChangedEvent,
StatisticEventParameters> producer = new AbstractEventProducer<>() {};
+
var prepareService = new PrepareServiceImpl(
clusterName,
0,
@@ -698,7 +703,8 @@ public class TestBuilders {
new NoOpMetricManager(),
schemaManager,
clockService::currentLong,
- scheduledExecutor
+ scheduledExecutor,
+ producer
);
Map<String, List<String>> systemViewsByNode = new HashMap<>();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
index 28ab93a627f..a330bc84301 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTimeoutTest.java
@@ -38,6 +38,7 @@ import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.plan.volcano.VolcanoTimeoutException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelVisitor;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.metrics.MetricManagerImpl;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
@@ -53,6 +54,8 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.type.NativeTypes;
@@ -69,6 +72,7 @@ public class PlannerTimeoutTest extends AbstractPlannerTest {
long plannerTimeout = 1L;
IgniteSchema schema = createSchema(createTestTable("T1"));
SqlOperationContext ctx = operationContext();
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
PrepareService prepareService = new PrepareServiceImpl(
"test",
@@ -81,7 +85,8 @@ public class PlannerTimeoutTest extends AbstractPlannerTest {
new MetricManagerImpl(),
new PredefinedSchemaManager(schema),
mock(LongSupplier.class),
- mock(ScheduledExecutorService.class)
+ mock(ScheduledExecutorService.class),
+ producer
);
prepareService.start();
try {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
index c624b65e571..25af2adbdda 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImplTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.prepare;
import static
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_INITIAL_DELAY;
+import static
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PLAN_UPDATER_REFRESH_PERIOD;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
@@ -45,7 +46,9 @@ import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -56,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -66,12 +70,15 @@ import
org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.framework.PredefinedSchemaManager;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.VersionedSchemaManager;
+import
org.apache.ignite.internal.sql.engine.prepare.PrepareServiceImpl.PlanInfo;
import
org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
@@ -524,7 +531,7 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
IgniteSchema schema = new IgniteSchema("TEST", 0, List.of(table1));
AtomicInteger ver = new AtomicInteger();
- PrepareServiceImpl service =
createPlannerServiceWithInactivePlanUpdater(schema,
CaffeineCacheFactory.INSTANCE, 10000,
+ PrepareServiceImpl service = createPlannerService(schema,
CaffeineCacheFactory.INSTANCE, 10000,
Integer.MAX_VALUE, 1000, ver);
String selectQuery = "SELECT * FROM test.t1 WHERE c1 = 1";
@@ -536,15 +543,23 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
await(service.prepareAsync(parse(selectQuery),
operationContext().build()));
Awaitility.await()
- .atMost(Duration.ofMillis(10000))
+ .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
.until(
() -> service.cache.size() == 2
);
- assertThat(service.cache.size(), is(2));
service.statisticsChanged(table1.id());
- assertThat(service.cache.entrySet().stream().filter(e ->
e.getValue().join().needInvalidate()).count(), is(1L));
+ Set<Entry<CacheKey, CompletableFuture<PlanInfo>>> cachedSnap = new
HashSet<>(service.cache.entrySet());
+
+ Awaitility.await()
+ .atMost(Duration.ofMillis(2 * PLAN_UPDATER_REFRESH_PERIOD))
+ .until(
+ () -> !new
HashSet<>(service.cache.entrySet()).equals(cachedSnap)
+ );
+
+ // cache futures snapshot highlight only one invalidation item.
+ assertThat(cachedSnap.stream().filter(e ->
e.getValue().join().needInvalidate()).count(), is(1L));
}
@Test
@@ -848,9 +863,11 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
+
PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize,
cacheFactory,
mock(DdlSqlToCommandConverter.class), timeoutMillis, 2,
planExpireSeconds, mock(MetricManagerImpl.class),
- new PredefinedSchemaManager(schemas),
clockService::currentLong, commonExecutor);
+ new PredefinedSchemaManager(schemas),
clockService::currentLong, commonExecutor, producer);
createdServices.add(service);
@@ -883,9 +900,11 @@ public class PrepareServiceImplTest extends
BaseIgniteAbstractTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
+
PrepareServiceImpl service = new PrepareServiceImpl("test", cacheSize,
cacheFactory,
mock(DdlSqlToCommandConverter.class), timeoutMillis, 2,
planExpireSeconds, mock(MetricManagerImpl.class),
- new VersionedSchemaManager(schemas, ver),
clockService::currentLong, executor);
+ new VersionedSchemaManager(schemas, ver),
clockService::currentLong, executor, producer);
createdServices.add(service);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
index fc526b747e3..57d6eda18cb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImplTest.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.sql.engine.statistic;
+import static it.unimi.dsi.fastutil.ints.Int2ObjectMap.entry;
import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.DEFAULT_TABLE_SIZE;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.INITIAL_DELAY;
+import static
org.apache.ignite.internal.sql.engine.statistic.SqlStatisticManagerImpl.REFRESH_PERIOD;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
@@ -31,12 +31,15 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
@@ -53,8 +56,10 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.lang.IgniteCheckedException;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.sql.ColumnType;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
@@ -65,6 +70,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
* Tests of SqlStatisticManagerImpl.
*/
@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
class SqlStatisticManagerImplTest extends BaseIgniteAbstractTest {
@Mock
private TableManager tableManager;
@@ -81,6 +87,12 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
@Mock
private LowWatermark lowWatermark;
+ @Mock
+ StatisticAggregatorImpl statAggregator;
+
+ @InjectExecutorService
+ private ScheduledExecutorService commonExecutor;
+
private static final CatalogTableColumnDescriptor pkCol =
new CatalogTableColumnDescriptor("pkCol", ColumnType.STRING,
false, 0, 0, 10, null);
@@ -89,9 +101,9 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
int tableId = ThreadLocalRandom.current().nextInt();
// Preparing:
when(catalogManager.catalog(anyInt())).thenReturn(mock(Catalog.class));
- when(tableManager.cachedTable(tableId)).thenReturn(null);
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
// Test:
@@ -108,51 +120,24 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
-
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
- // Test:
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
- // The second time we should obtain the same value from a cache.
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
-
- verify(internalTable, times(1)).estimatedSize();
- }
+ long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
- @Test
- public void testEstimationFailure() {
- int tableId = ThreadLocalRandom.current().nextInt();
-
- prepareCatalogWithTable(tableId);
-
- when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
- when(tableViewInternal.internalTable()).thenReturn(internalTable);
- when(internalTable.estimatedSize()).thenReturn(
- CompletableFuture.completedFuture(1L),
- CompletableFuture.completedFuture(2L),
- CompletableFuture.failedFuture(new
IgniteCheckedException(INTERNAL_ERR, "Test exception"))
+ Awaitility.await().timeout(timeout,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
);
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
- StatisticUpdatesSupplier notifier =
mock(StatisticUpdatesSupplier.class);
- sqlStatisticManager.changesNotifier(notifier);
- sqlStatisticManager.start();
-
- sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
-
- // table size 1
- assertEquals(1L, sqlStatisticManager.tableSize(tableId));
- verify(notifier, times(1)).accept(anyInt());
-
- // table size 2
- assertEquals(2L, sqlStatisticManager.tableSize(tableId));
+ // Second time we should obtain the same value from a cache.
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
- // exceptionable table size
- assertEquals(2L, sqlStatisticManager.tableSize(tableId));
- verify(notifier, times(2)).accept(anyInt());
- verify(internalTable, times(3)).estimatedSize();
+ verify(statAggregator,
times(1)).estimatedSizeWithLastUpdate(List.of(internalTable));
}
@Test
@@ -160,35 +145,49 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
int tableId = ThreadLocalRandom.current().nextInt();
long tableSize1 = 999_888_777L;
long tableSize2 = 111_222_333L;
+
+ HybridTimestamp time1 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+ HybridTimestamp time2 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
// Preparing:
prepareCatalogWithTable(tableId);
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
- when(internalTable.estimatedSize()).thenReturn(
- CompletableFuture.completedFuture(tableSize1),
- CompletableFuture.completedFuture(tableSize2));
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+ .thenReturn(
+
CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize1, time1.longValue())))),
+
CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize2, time2.longValue()))))
+ );
+
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
- // Test:
- assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
+ long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+ Awaitility.await().timeout(timeout,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertEquals(tableSize1,
sqlStatisticManager.tableSize(tableId))
+ );
// The second time we should obtain the same value from a cache.
assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
- // Allow to refresh value.
- sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(0);
+ // Forcibly update.
+ sqlStatisticManager.forceUpdateAll();
// Now we need obtain a fresh value of table size.
assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
+ assertEquals(time2.longValue(),
sqlStatisticManager.tableSizeMap.get(tableId).modificationCounter());
- verify(internalTable, times(2)).estimatedSize();
+ verify(statAggregator,
times(2)).estimatedSizeWithLastUpdate(List.of(internalTable));
}
@Test
- public void checkLoadAllTablesOnStart() {
+ public void checkLoadAllTablesOnStart() throws Exception {
int minimumCatalogVersion = 1;
int maximumCatalogVersion = 10;
+ long tableSize = 99999L;
// Preparing:
when(catalogManager.earliestCatalogVersion()).thenReturn(minimumCatalogVersion);
@@ -223,16 +222,25 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(tableManager.cachedTable(anyInt())).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
-
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(99999L));
+ Int2ObjectMap<PartitionModificationInfo> modifications = new
Int2ObjectOpenHashMap<>();
+ for (int i = minimumCatalogVersion; i <= maximumCatalogVersion + 1;
++i) {
+ modifications.put(i, new PartitionModificationInfo(tableSize,
Long.MAX_VALUE));
+ }
+ when(statAggregator.estimatedSizeWithLastUpdate(any()))
+ .thenReturn(CompletableFuture.completedFuture(modifications));
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
+ sqlStatisticManager.forceUpdateAll();
+ sqlStatisticManager.lastUpdateStatisticFuture().get(5_000,
TimeUnit.MILLISECONDS);
+
// Test:
// For known tables we got calculated table size.
for (int i = minimumCatalogVersion; i <= maximumCatalogVersion; i++) {
- assertEquals(99999L, sqlStatisticManager.tableSize(i));
- assertEquals(99999L, sqlStatisticManager.tableSize(i + 1));
+ assertEquals(tableSize, sqlStatisticManager.tableSize(i));
+ assertEquals(tableSize, sqlStatisticManager.tableSize(i + 1));
}
// For unknown tables we got default size.
@@ -258,13 +266,19 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
-
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
- // Test:
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+ long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+ Awaitility.await().timeout(timeout,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
+ );
int catalogVersionBeforeDrop = 1;
int catalogVersionAfterDrop = 2;
@@ -272,6 +286,7 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
// After table drop we still obtain the same value from a cache.
tableDropCapture.getValue().notify(new DropTableEventParameters(-1,
catalogVersionAfterDrop, tableId));
assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+ assertEquals(1L, sqlStatisticManager.droppedTables.size());
// After LWM has been changed data in case is removed and we get
default value.
when(catalogManager.activeCatalogVersion(catalogVersionBeforeDrop)).thenReturn(catalogVersionAfterDrop);
@@ -279,6 +294,8 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
.notify(new
ChangeLowWatermarkEventParameters(HybridTimestamp.hybridTimestamp(catalogVersionBeforeDrop)));
assertEquals(DEFAULT_TABLE_SIZE,
sqlStatisticManager.tableSize(tableId));
+ // After LWM dropped tables are also cleared.
+ assertEquals(0L, sqlStatisticManager.droppedTables.size());
}
@SuppressWarnings("PMD.UseNotifyAllInsteadOfNotify")
@@ -295,9 +312,12 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
-
when(internalTable.estimatedSize()).thenReturn(CompletableFuture.completedFuture(tableSize));
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable)))
+
.thenReturn(CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize, Long.MAX_VALUE)))));
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
sqlStatisticManager.start();
// Test:
@@ -316,53 +336,97 @@ class SqlStatisticManagerImplTest extends
BaseIgniteAbstractTest {
.build();
tableCreateCapture.getValue().notify(new
CreateTableEventParameters(-1, 0, catalogDescriptor));
// now table is created and size should be actual.
- assertEquals(tableSize, sqlStatisticManager.tableSize(tableId));
+ long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+ Awaitility.await().timeout(timeout,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertEquals(tableSize, sqlStatisticManager.tableSize(tableId))
+ );
}
@Test
- void ensureStaleRequestsAreDiscarded() throws InterruptedException {
+ void statisticUpdatesIfEstimationPartiallyUnavailable() {
int tableId = ThreadLocalRandom.current().nextInt();
long tableSize1 = 100_000L;
long tableSize2 = 500_000L;
+
+ HybridTimestamp time1 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+ HybridTimestamp time2 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
// Preparing:
prepareCatalogWithTable(tableId);
- CompletableFuture<Long> staleResultFuture = new CompletableFuture<>();
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable))).thenReturn(
+ // stale result goes first
+ CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize1, time1.longValue())))),
+ CompletableFuture.failedFuture(new RuntimeException("smth
wrong")),
+ CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize2, time2.longValue()))))
+ );
+
when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
when(tableViewInternal.internalTable()).thenReturn(internalTable);
- when(internalTable.estimatedSize()).thenReturn(
- staleResultFuture, // stale result goes first
- CompletableFuture.completedFuture(tableSize2)
- );
- SqlStatisticManagerImpl sqlStatisticManager = new
SqlStatisticManagerImpl(tableManager, catalogManager, lowWatermark);
- sqlStatisticManager.start();
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
- // Test:
- // first call results in non-completed future, therefore default size
is expected
- assertEquals(DEFAULT_TABLE_SIZE,
sqlStatisticManager.tableSize(tableId));
+ sqlStatisticManager.start();
+ // tableSize1
+ sqlStatisticManager.forceUpdateAll();
- // Allow to refresh value.
- sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(1);
+ assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
- Future<?> oldUpdateFuture =
sqlStatisticManager.lastUpdateStatisticFuture();
+ // err call
+ sqlStatisticManager.forceUpdateAll();
- // Wait till statistics will be requested one more time.
- assertTrue(waitForCondition(() -> {
- sqlStatisticManager.tableSize(tableId);
- Future<?> currentUpdateFuture =
sqlStatisticManager.lastUpdateStatisticFuture();
+ assertEquals(tableSize1, sqlStatisticManager.tableSize(tableId));
- // reference comparison on purpose
- return oldUpdateFuture != currentUpdateFuture;
- }, 1_000));
+ // tableSize2
+ sqlStatisticManager.forceUpdateAll();
- // Now we need obtain a new value of table size.
assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
- staleResultFuture.complete(tableSize1);
+ verify(statAggregator,
times(3)).estimatedSizeWithLastUpdate(List.of(internalTable));
+ }
- // Forbid refreshing the statistics.
- sqlStatisticManager.setThresholdTimeToPostponeUpdateMs(Long.MAX_VALUE);
+ @Test
+ void ensureStaleRequestsAreDiscarded() {
+ int tableId = ThreadLocalRandom.current().nextInt();
+ long tableSize1 = 100_000L;
+ long tableSize2 = 500_000L;
+
+ HybridTimestamp time1 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(1000);
+ HybridTimestamp time2 =
HybridTimestamp.MAX_VALUE.subtractPhysicalTime(500);
+ // Preparing:
+ prepareCatalogWithTable(tableId);
+
+ when(tableManager.cachedTable(tableId)).thenReturn(tableViewInternal);
+ when(tableViewInternal.internalTable()).thenReturn(internalTable);
+
when(statAggregator.estimatedSizeWithLastUpdate(List.of(internalTable))).thenReturn(
+ // stale result goes first
+ CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize1, time1.longValue())))),
+ CompletableFuture.completedFuture(Int2ObjectMap.ofEntries(
+ entry(tableId, new
PartitionModificationInfo(tableSize2, time2.longValue()))))
+ );
+
+ SqlStatisticManagerImpl sqlStatisticManager =
+ new SqlStatisticManagerImpl(tableManager, catalogManager,
lowWatermark, commonExecutor, statAggregator);
+ sqlStatisticManager.start();
+
+ // Test:
+ // first call results in non-completed future, therefore default size
is expected
+ assertEquals(DEFAULT_TABLE_SIZE,
sqlStatisticManager.tableSize(tableId));
+
+ // get stale
+ sqlStatisticManager.forceUpdateAll();
+ // get tableSize2
+ sqlStatisticManager.forceUpdateAll();
+
+ long timeout = 2 * Math.max(REFRESH_PERIOD, INITIAL_DELAY);
+
+ Awaitility.await().timeout(timeout,
TimeUnit.MILLISECONDS).untilAsserted(() ->
+ assertEquals(tableSize2,
sqlStatisticManager.tableSize(tableId))
+ );
// Stale result must be discarded.
assertEquals(tableSize2, sqlStatisticManager.tableSize(tableId));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
index 83a5d38cce6..10f436ef43f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/metrics/PlanningCacheMetricsTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.ClockServiceImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metrics.MetricManager;
@@ -39,6 +40,8 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
+import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
@@ -77,9 +80,11 @@ public class PlanningCacheMetricsTest extends
AbstractPlannerTest {
when(clockService.currentLong()).thenReturn(new HybridTimestamp(1_000,
500).longValue());
+ AbstractEventProducer<StatisticChangedEvent, StatisticEventParameters>
producer = new AbstractEventProducer<>() {};
+
PrepareService prepareService = new PrepareServiceImpl(
"test", 2, cacheFactory, null, 15_000L, 2, Integer.MAX_VALUE,
metricManager, new PredefinedSchemaManager(schema),
- clockService::currentLong, commonExecutor
+ clockService::currentLong, commonExecutor, producer
);
prepareService.start();
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 86622fde1c3..b7814536d3d 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -23,6 +23,7 @@ apply from:
"$rootDir/buildscripts/java-integration-test.gradle"
dependencies {
annotationProcessor project(':ignite-configuration-annotation-processor')
+ annotationProcessor project(':ignite-network-annotation-processor')
annotationProcessor libs.auto.service
implementation project(':ignite-api')
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
index e9bb24446fc..59d26613f27 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounter.java
@@ -45,12 +45,9 @@ public class PartitionModificationCounter {
SizeSupplier partitionSizeSupplier,
StalenessConfigurationSupplier stalenessConfigurationSupplier
) {
- Objects.requireNonNull(initTimestamp, "initTimestamp");
- Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
- Objects.requireNonNull(stalenessConfigurationSupplier,
"configurationProvider");
-
- this.partitionSizeSupplier = partitionSizeSupplier;
- this.stalenessConfigurationSupplier = stalenessConfigurationSupplier;
+ lastMilestoneReachedTimestamp = Objects.requireNonNull(initTimestamp,
"initTimestamp");
+ this.partitionSizeSupplier =
Objects.requireNonNull(partitionSizeSupplier, "partitionSizeSupplier");
+ this.stalenessConfigurationSupplier =
Objects.requireNonNull(stalenessConfigurationSupplier, "configurationProvider");
TableStatsStalenessConfiguration tableStatsStalenessConfiguration =
stalenessConfigurationSupplier.get();
@@ -59,7 +56,6 @@ public class PartitionModificationCounter {
tableStatsStalenessConfiguration.staleRowsFraction(),
tableStatsStalenessConfiguration.minStaleRowsCount()
);
- lastMilestoneReachedTimestamp = initTimestamp;
}
/** Returns the current counter value. */
@@ -67,6 +63,11 @@ public class PartitionModificationCounter {
return counter.get();
}
+ /** Returns partition estimated size. */
+ public long estimatedSize() {
+ return partitionSizeSupplier.get();
+ }
+
/**
* Returns a timestamp representing the commit time of the
* last transaction that caused the counter to reach a milestone.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
index 9c4182c8a7f..1397ff0a878 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterFactory.java
@@ -17,18 +17,34 @@
package org.apache.ignite.internal.table.distributed;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import
org.apache.ignite.internal.table.message.GetEstimatedSizeWithLastModifiedTsRequest;
+import
org.apache.ignite.internal.table.message.PartitionModificationInfoMessage;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
+import org.jetbrains.annotations.Nullable;
/**
* Factory for producing {@link PartitionModificationCounter}.
*/
public class PartitionModificationCounterFactory {
-
private final Supplier<HybridTimestamp> currentTimestampSupplier;
+ private final MessagingService messagingService;
+ private final Map<TablePartitionId, PartitionModificationCounter>
partitionsInfo = new HashMap<>();
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
- public PartitionModificationCounterFactory(Supplier<HybridTimestamp>
currentTimestampSupplier) {
+ public PartitionModificationCounterFactory(Supplier<HybridTimestamp>
currentTimestampSupplier, MessagingService messagingService) {
this.currentTimestampSupplier = currentTimestampSupplier;
+ this.messagingService = messagingService;
}
/**
@@ -36,17 +52,27 @@ public class PartitionModificationCounterFactory {
*
* @param partitionSizeSupplier Partition size supplier.
* @param stalenessConfigurationSupplier Partition size supplier.
+ * @param tableId Table id.
+ * @param partitionId partition id.
* @return New partition modification counter.
*/
public PartitionModificationCounter create(
SizeSupplier partitionSizeSupplier,
- StalenessConfigurationSupplier stalenessConfigurationSupplier
+ StalenessConfigurationSupplier stalenessConfigurationSupplier,
+ int tableId,
+ int partitionId
) {
- return new PartitionModificationCounter(
+ PartitionModificationCounter info = new PartitionModificationCounter(
currentTimestampSupplier.get(),
partitionSizeSupplier,
stalenessConfigurationSupplier
);
+
+ synchronized (this) {
+ partitionsInfo.put(new TablePartitionId(tableId, partitionId),
info);
+ }
+
+ return info;
}
/** An interface representing supplier of current size. */
@@ -60,4 +86,41 @@ public class PartitionModificationCounterFactory {
public interface StalenessConfigurationSupplier {
TableStatsStalenessConfiguration get();
}
+
+ /**
+ * Starts routine.
+ */
+ public void start() {
+ messagingService.addMessageHandler(TableMessageGroup.class,
this::handleMessage);
+ }
+
+ private void handleMessage(NetworkMessage message, InternalClusterNode
sender, @Nullable Long correlationId) {
+ if (message instanceof GetEstimatedSizeWithLastModifiedTsRequest) {
+ handleRequestCounter(sender);
+ }
+ }
+
+ private void handleRequestCounter(InternalClusterNode sender) {
+ List<PartitionModificationInfoMessage> modificationInfo = new
ArrayList<>();
+
+ synchronized (this) {
+ for (Map.Entry<TablePartitionId, PartitionModificationCounter> ent
: partitionsInfo.entrySet()) {
+ PartitionModificationCounter info = ent.getValue();
+ TablePartitionId tblPartId = ent.getKey();
+ PartitionModificationInfoMessage infoMsg =
TABLE_MESSAGES_FACTORY.partitionModificationInfoMessage()
+ .tableId(tblPartId.tableId())
+ .partId(tblPartId.partitionId())
+ .estimatedSize(info.estimatedSize())
+
.lastModificationCounter(info.lastMilestoneTimestamp().longValue())
+ .build();
+
+ modificationInfo.add(infoMsg);
+ }
+ }
+
+ messagingService.send(sender, TABLE_MESSAGES_FACTORY
+ .getEstimatedSizeWithLastModifiedTsResponse()
+ .modifications(modificationInfo)
+ .build());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f9832a8c6c2..e3fb64f6f06 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -3192,9 +3192,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
GcUpdateHandler gcUpdateHandler = new
GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler);
SizeSupplier partSizeSupplier = () ->
partitionDataStorage.getStorage().estimatedSize();
- PartitionModificationCounter modificationCounter =
partitionModificationCounterFactory.create(
- partSizeSupplier, table::stalenessConfiguration
- );
+
+ PartitionModificationCounter modificationCounter =
+ partitionModificationCounterFactory.create(partSizeSupplier,
table::stalenessConfiguration, table.tableId(), partitionId);
+
registerPartitionModificationCounterMetrics(table, partitionId,
modificationCounter);
StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
similarity index 59%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
index 997cbe7d606..b557c248f0a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsRequest.java
@@ -15,8 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** A message that queries a partition estimate size and last modification ts.
*/
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST)
+public interface GetEstimatedSizeWithLastModifiedTsRequest extends
NetworkMessage {
+ /** Table id`s to request estimated size for. */
+ Collection<Integer> tables();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
similarity index 61%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
index 997cbe7d606..f16f4e02d0b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/GetEstimatedSizeWithLastModifiedTsResponse.java
@@ -15,8 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** A response to the {@link GetEstimatedSizeWithLastModifiedTsRequest}. */
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE)
+public interface GetEstimatedSizeWithLastModifiedTsResponse extends
NetworkMessage {
+ List<PartitionModificationInfoMessage> modifications();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
similarity index 57%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
index 997cbe7d606..8d066badee2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/PartitionModificationInfoMessage.java
@@ -15,8 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+
+/** Message for transferring a partition modification info. */
+@Transferable(TableMessageGroup.GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE)
+public interface PartitionModificationInfoMessage extends NetworkMessage {
+ /** Table id. */
+ int tableId();
+
+ /** Partition id. */
+ int partId();
+
+ /** Estimated size. */
+ long estimatedSize();
+
+ /** Modification counter. */
+ long lastModificationCounter();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
similarity index 50%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
index 997cbe7d606..cf5c2f8dbf0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticUpdateManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
@@ -15,8 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.statistic;
+package org.apache.ignite.internal.table.message;
-/** Statistic manager with reaction on statistic changes. */
-public interface SqlStatisticUpdateManager extends SqlStatisticManager,
StatisticUpdatesNotifier {
+import org.apache.ignite.internal.network.annotations.MessageGroup;
+
+/**
+ * Message group for table module.
+ */
+@MessageGroup(groupType = TableMessageGroup.GROUP_TYPE, groupName =
"TableMessages")
+public interface TableMessageGroup {
+ /** Table message group type. */
+ short GROUP_TYPE = 17;
+
+ /** Message type for {@link GetEstimatedSizeWithLastModifiedTsRequest}. */
+ short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_REQUEST = 1;
+
+ /** Message type for {@link GetEstimatedSizeWithLastModifiedTsResponse}. */
+ short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE_RESPONSE = 2;
+
+ /** Message type for {@link PartitionModificationInfoMessage}. */
+ short GET_ESTIMATED_SIZE_WITH_MODIFIED_TS_MESSAGE = 3;
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
index e2968577009..9c48114a153 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionModificationCounterTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.table.distributed;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.Test;
@@ -31,14 +33,14 @@ import org.junit.jupiter.api.Test;
*/
public class PartitionModificationCounterTest extends BaseIgniteAbstractTest {
private final PartitionModificationCounterFactory factory =
- new PartitionModificationCounterFactory(() ->
HybridTimestamp.hybridTimestamp(1L));
+ new PartitionModificationCounterFactory(() ->
HybridTimestamp.hybridTimestamp(1L), mock(MessagingService.class));
@Test
void initialValues() {
// Empty table.
{
PartitionModificationCounter counter = factory.create(
- () -> 0L, () -> new TableStatsStalenessConfiguration(0.5,
200)
+ () -> 0L, () -> new TableStatsStalenessConfiguration(0.5,
200), 0, 0
);
assertThat(counter.value(), is(0L));
@@ -49,7 +51,7 @@ public class PartitionModificationCounterTest extends
BaseIgniteAbstractTest {
// Table with 10k rows.
{
PartitionModificationCounter counter = factory.create(
- () -> 10_000L, () -> new
TableStatsStalenessConfiguration(0.2, 200)
+ () -> 10_000L, () -> new
TableStatsStalenessConfiguration(0.2, 200), 0, 0
);
assertThat(counter.value(), is(0L));
@@ -73,7 +75,7 @@ public class PartitionModificationCounterTest extends
BaseIgniteAbstractTest {
() -> rowsCount,
() -> new TableStatsStalenessConfiguration(
CatalogUtils.DEFAULT_STALE_ROWS_FRACTION,
CatalogUtils.DEFAULT_MIN_STALE_ROWS_COUNT
- )
+ ), 0, 0
);
assertThat(counter.lastMilestoneTimestamp().longValue(), is(1L));
@@ -102,7 +104,7 @@ public class PartitionModificationCounterTest extends
BaseIgniteAbstractTest {
@SuppressWarnings({"ThrowableNotThrown",
"ResultOfObjectAllocationIgnored", "DataFlowIssue"})
void invalidUpdateValues() {
PartitionModificationCounter counter = factory.create(
- () -> 0L, () -> new TableStatsStalenessConfiguration(0.2, 500)
+ () -> 0L, () -> new TableStatsStalenessConfiguration(0.2,
500), 0, 0
);
IgniteTestUtils.assertThrows(NullPointerException.class,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index 2f2aa1bdd07..eef93d46a1b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
import java.util.List;
import org.apache.ignite.internal.catalog.CatalogCommand;
@@ -43,6 +44,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.sql.SqlCommon;
import
org.apache.ignite.internal.table.distributed.PartitionModificationCounter;
import
org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
@@ -70,10 +72,13 @@ public class TableTestUtils {
/** No-op partition modification counter factory. */
public static PartitionModificationCounterFactory
NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY =
- new PartitionModificationCounterFactory(() ->
HybridTimestamp.MIN_VALUE) {
+ new PartitionModificationCounterFactory(() ->
HybridTimestamp.MIN_VALUE, mock(MessagingService.class)) {
@Override
public PartitionModificationCounter create(
- SizeSupplier partitionSizeSupplier,
StalenessConfigurationSupplier configurationSupplier
+ SizeSupplier partitionSizeSupplier,
+ StalenessConfigurationSupplier configurationSupplier,
+ int tableId,
+ int partitionId
) {
return NOOP_PARTITION_MODIFICATION_COUNTER;
}