This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5f51adec1 IGNITE-22776 SQL: Fix table statistics periodical update -
Fixes #11448.
4e5f51adec1 is described below
commit 4e5f51adec1224e394cf63d8bfebe6ec93ba60f9
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Jul 29 11:42:39 2024 +0300
IGNITE-22776 SQL: Fix table statistics periodical update - Fixes #11448.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../processors/query/GridQueryProcessor.java | 4 +
.../query/stat/IgniteStatisticsManagerImpl.java | 21 ++--
.../query/stat/StatisticsAbstractTest.java | 15 ++-
.../query/stat/StatisticsObsolescenceTest.java | 133 ++++++++++++++++++++-
4 files changed, 160 insertions(+), 13 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 208ee4f9c67..9e8897f2ef0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2747,6 +2747,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
if (idx != null)
idx.store(cctx, desc, newRow, prevRow, prevRowAvailable);
+
+ statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(),
newRow.partition(), key.valueBytes(coctx));
}
/**
@@ -3614,6 +3616,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
if (indexingEnabled())
idx.remove(cctx, desc, row);
+
+ statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(),
row.partition(), row.key().valueBytes(cctx.cacheObjectContext()));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
index 66f5af303b8..1e2ceb3c593 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
@@ -111,7 +111,7 @@ public class IgniteStatisticsManagerImpl implements
IgniteStatisticsManager {
private volatile boolean started;
/** Schedule to process obsolescence statistics. */
- private GridTimeoutProcessor.CancelableTask obsolescenceSchedule;
+ private volatile GridTimeoutProcessor.CancelableTask obsolescenceSchedule;
/** Exchange listener. */
private final PartitionsExchangeAware exchAwareLsnr = new
PartitionsExchangeAware() {
@@ -236,16 +236,23 @@ public class IgniteStatisticsManagerImpl implements
IgniteStatisticsManager {
tryStart();
- if (serverNode) {
- // Use mgmt pool to work with statistics repository in busy lock
to schedule some tasks.
- obsolescenceSchedule = ctx.timeout().schedule(() -> {
- obsolescenceBusyExecutor.execute(() -> processObsolescence());
- }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000);
- }
+ if (serverNode)
+ scheduleObsolescence(OBSOLESCENCE_INTERVAL);
ctx.cache().context().exchange().registerExchangeAwareComponent(exchAwareLsnr);
}
+ /** */
+ void scheduleObsolescence(int seconds) {
+ assert seconds >= 1;
+
+ if (obsolescenceSchedule != null)
+ obsolescenceSchedule.close();
+
+ obsolescenceSchedule = ctx.timeout().schedule(() ->
obsolescenceBusyExecutor.execute(this::processObsolescence),
+ seconds * 1000, seconds * 1000);
+ }
+
/**
* Check all preconditions and stop if started and have reason to stop.
*/
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
index 8c87170a466..a631e997237 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
@@ -261,12 +261,23 @@ public abstract class StatisticsAbstractTest extends
GridCommonAbstractTest {
}
/**
- * Create SQL table with the given index.
+ * Creates SQL table with the given index and fills with small amount of
data.
*
* @param suffix Table idx, if {@code null} - name "SMALL" without index
will be used.
* @return Table name.
*/
protected String createSmallTable(String suffix) {
+ return createSmallTable(SMALL_SIZE, suffix);
+ }
+
+ /**
+ * Creates SQL table with the given index and fills with data of the
passed amount.
+ *
+ * @param preloadCnt Records cnt to load after creation.
+ * @param suffix Table idx, if {@code null} - name "SMALL" without index
will be used.
+ * @return Table name.
+ */
+ protected String createSmallTable(int preloadCnt, String suffix) {
String tblName = "small" + ((suffix != null) ? suffix : "");
sql("DROP TABLE IF EXISTS " + tblName);
@@ -279,7 +290,7 @@ public abstract class StatisticsAbstractTest extends
GridCommonAbstractTest {
sql(String.format("CREATE INDEX %s_c ON %s(c)", tblName, tblName));
- for (int i = 0; i < SMALL_SIZE; i++)
+ for (int i = 0; i < preloadCnt; i++)
sql(String.format("INSERT INTO %s(a, b, c) VALUES(%d, %d, %d)",
tblName, i, i, i % 10));
return tblName;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
index 1baa806d1a2..417f8374317 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
@@ -18,22 +18,147 @@
package org.apache.ignite.internal.processors.query.stat;
import java.util.Map;
-
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static
org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Test for statistics obsolescence.
*/
public class StatisticsObsolescenceTest extends StatisticsAbstractTest {
+ /** */
+ @Test
+ public void testObsolescenceWithInsert() throws Exception {
+ doTestObsolescenceUnderLoad(false, 1,
+ key -> sql(String.format("insert into SMALL(A, B, C) values(%d,
%d, %d)", key, key, key)));
+ }
+
+ /** */
+ @Test
+ public void testObsolescenceWithUpdate() throws Exception {
+ doTestObsolescenceUnderLoad(true, 0, key -> sql("update SMALL set
B=B+1 where A=" + key));
+ }
+
+ /** */
+ @Test
+ public void testObsolescenceWithDelete() throws Exception {
+ doTestObsolescenceUnderLoad(true, -1, key -> sql("delete from SMALL
where A=" + key));
+ }
+
+ /** */
+ private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp,
Consumer<Long> op) throws Exception {
+ // Keep enough data to touch every partition. The statistics
collection is sensitive to a partition's empty rows num
+ // and is able to reassemble in this case. This would give
false-positive result.
+ int workingRowsNum = RendezvousAffinityFunction.DFLT_PARTITION_COUNT *
10;
+ int preloadCnt = preload ? workingRowsNum : 0;
+
+ int osbInterval = 7;
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ try {
+ startGridsMultiThreaded(2);
+
+ for (Ignite ig : G.allGrids())
+
((IgniteStatisticsManagerImpl)((IgniteEx)ig).context().query().statsManager()).scheduleObsolescence(osbInterval);
+
+ createSmallTable(preloadCnt, null);
+
+ statisticsMgr(0).usageState(StatisticsUsageState.ON);
+
statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET));
+
+ // Initialized statistics.
+ assertTrue(waitForCondition(() ->
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000));
+ assertTrue(waitForCondition(() ->
statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000));
+
+ ObjectStatisticsImpl initStat1 =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+ ObjectStatisticsImpl initStat2 =
(ObjectStatisticsImpl)statisticsMgr(1).getLocalStatistics(SMALL_KEY);
+
+ assertEquals(preloadCnt, initStat1.rowCount() +
initStat2.rowCount());
+
+ GridTestUtils.runAsync(() -> {
+ AtomicLong key = new AtomicLong(1L);
+
+ long opCnt = 0;
+
+ while (!barrier.isBroken()) {
+ op.accept(key.getAndIncrement());
+
+ // Enough updates to trigger the statistics.
+ if (++opCnt == workingRowsNum / 3) {
+ opCnt = 0;
+
+ barrier.await();
+ barrier.await();
+ }
+ }
+ });
+
+ barrier.await();
+
+ waitForStatsUpdates(initStat1, osbInterval * 2);
+
+ ObjectStatisticsImpl updatedStat =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+ assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() >
initStat1.rowCount() :
+ (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount()
: updatedStat.rowCount() == initStat1.rowCount()));
+
+ barrier.await();
+ barrier.await();
+
+ // Continuing data loading, the table is being updated. Since the
row count is inreasing, we must obtain a
+ // new statistics, greather than {@code firstNotEmpty}.
+ waitForStatsUpdates(updatedStat, osbInterval * 2);
+
+ ObjectStatisticsImpl finalStat =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+ assertTrue(rowCntCmp > 0 ? finalStat.rowCount() >
updatedStat.rowCount() :
+ (rowCntCmp < 0 ? finalStat.rowCount() < updatedStat.rowCount()
: finalStat.rowCount() == updatedStat.rowCount()));
+ }
+ finally {
+ barrier.reset();
+ }
+ }
+
+ /** */
+ private void waitForStatsUpdates(ObjectStatisticsImpl compareTo, long
timeoutSec) throws IgniteInterruptedCheckedException {
+ assertTrue(waitForCondition(() -> {
+ ObjectStatisticsImpl updatedStat =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+ if (updatedStat == null)
+ return false;
+
+ AtomicBoolean passed = new AtomicBoolean(true);
+
+ updatedStat.columnsStatistics().forEach((col, stat) -> {
+ ColumnStatistics compared = compareTo.columnStatistics(col);
+
+ assert compared != null;
+
+ if (compared.createdAt() >= stat.createdAt())
+ passed.set(false);
+ });
+
+ return passed.get();
+ }, timeoutSec * 1000));
+ }
+
/**
* Test statistics refreshing after significant changes of base table:
* 1) Create and populate small table
@@ -51,7 +176,7 @@ public class StatisticsObsolescenceTest extends
StatisticsAbstractTest {
statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET));
- assertTrue(GridTestUtils.waitForCondition(() ->
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT));
+ assertTrue(waitForCondition(() ->
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT));
ObjectStatisticsImpl stat1 =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
@@ -62,7 +187,7 @@ public class StatisticsObsolescenceTest extends
StatisticsAbstractTest {
statisticsMgr(0).processObsolescence();
- assertTrue(GridTestUtils.waitForCondition(() -> {
+ assertTrue(waitForCondition(() -> {
ObjectStatisticsImpl stat2 =
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
return stat2 != null && stat2.rowCount() > stat1.rowCount();
@@ -103,7 +228,7 @@ public class StatisticsObsolescenceTest extends
StatisticsAbstractTest {
ignite.cluster().state(ClusterState.ACTIVE);
- assertTrue(GridTestUtils.waitForCondition(() ->
statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT));
+ assertTrue(waitForCondition(() -> statObs.get(SMALL_KEY).size() >
oldSize, TIMEOUT));
}
/** {@inheritDoc} */