This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e5f51adec1 IGNITE-22776 SQL: Fix table statistics periodical update - 
Fixes #11448.
4e5f51adec1 is described below

commit 4e5f51adec1224e394cf63d8bfebe6ec93ba60f9
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Jul 29 11:42:39 2024 +0300

    IGNITE-22776 SQL: Fix table statistics periodical update - Fixes #11448.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../processors/query/GridQueryProcessor.java       |   4 +
 .../query/stat/IgniteStatisticsManagerImpl.java    |  21 ++--
 .../query/stat/StatisticsAbstractTest.java         |  15 ++-
 .../query/stat/StatisticsObsolescenceTest.java     | 133 ++++++++++++++++++++-
 4 files changed, 160 insertions(+), 13 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 208ee4f9c67..9e8897f2ef0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2747,6 +2747,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         if (idx != null)
             idx.store(cctx, desc, newRow, prevRow, prevRowAvailable);
+
+        statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(), 
newRow.partition(), key.valueBytes(coctx));
     }
 
     /**
@@ -3614,6 +3616,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         if (indexingEnabled())
             idx.remove(cctx, desc, row);
+
+        statsMgr.onRowUpdated(desc.schemaName(), desc.tableName(), 
row.partition(), row.key().valueBytes(cctx.cacheObjectContext()));
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
index 66f5af303b8..1e2ceb3c593 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
@@ -111,7 +111,7 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
     private volatile boolean started;
 
     /** Schedule to process obsolescence statistics. */
-    private GridTimeoutProcessor.CancelableTask obsolescenceSchedule;
+    private volatile GridTimeoutProcessor.CancelableTask obsolescenceSchedule;
 
     /** Exchange listener. */
     private final PartitionsExchangeAware exchAwareLsnr = new 
PartitionsExchangeAware() {
@@ -236,16 +236,23 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
 
         tryStart();
 
-        if (serverNode) {
-            // Use mgmt pool to work with statistics repository in busy lock 
to schedule some tasks.
-            obsolescenceSchedule = ctx.timeout().schedule(() -> {
-                obsolescenceBusyExecutor.execute(() -> processObsolescence());
-            }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000);
-        }
+        if (serverNode)
+            scheduleObsolescence(OBSOLESCENCE_INTERVAL);
 
         
ctx.cache().context().exchange().registerExchangeAwareComponent(exchAwareLsnr);
     }
 
+    /** */
+    void scheduleObsolescence(int seconds) {
+        assert seconds >= 1;
+
+        if (obsolescenceSchedule != null)
+            obsolescenceSchedule.close();
+
+        obsolescenceSchedule = ctx.timeout().schedule(() -> 
obsolescenceBusyExecutor.execute(this::processObsolescence),
+            seconds * 1000, seconds * 1000);
+    }
+
     /**
      * Check all preconditions and stop if started and have reason to stop.
      */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
index 8c87170a466..a631e997237 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsAbstractTest.java
@@ -261,12 +261,23 @@ public abstract class StatisticsAbstractTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Create SQL table with the given index.
+     * Creates SQL table with the given index and fills with small amount of 
data.
      *
      * @param suffix Table idx, if {@code null} - name "SMALL" without index 
will be used.
      * @return Table name.
      */
     protected String createSmallTable(String suffix) {
+        return createSmallTable(SMALL_SIZE, suffix);
+    }
+
+    /**
+     * Creates SQL table with the given index and fills with data of the 
passed amount.
+     *
+     * @param preloadCnt Records cnt to load after creation.
+     * @param suffix Table idx, if {@code null} - name "SMALL" without index 
will be used.
+     * @return Table name.
+     */
+    protected String createSmallTable(int preloadCnt, String suffix) {
         String tblName = "small" + ((suffix != null) ? suffix : "");
 
         sql("DROP TABLE IF EXISTS " + tblName);
@@ -279,7 +290,7 @@ public abstract class StatisticsAbstractTest extends 
GridCommonAbstractTest {
 
         sql(String.format("CREATE INDEX %s_c ON %s(c)", tblName, tblName));
 
-        for (int i = 0; i < SMALL_SIZE; i++)
+        for (int i = 0; i < preloadCnt; i++)
             sql(String.format("INSERT INTO %s(a, b, c) VALUES(%d, %d, %d)", 
tblName, i, i, i % 10));
 
         return tblName;
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
index 1baa806d1a2..417f8374317 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsObsolescenceTest.java
@@ -18,22 +18,147 @@
 package org.apache.ignite.internal.processors.query.stat;
 
 import java.util.Map;
-
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static 
org.apache.ignite.internal.processors.query.stat.IgniteStatisticsHelper.buildDefaultConfigurations;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Test for statistics obsolescence.
  */
 public class StatisticsObsolescenceTest extends StatisticsAbstractTest {
+    /** */
+    @Test
+    public void testObsolescenceWithInsert() throws Exception {
+        doTestObsolescenceUnderLoad(false, 1,
+            key -> sql(String.format("insert into SMALL(A, B, C) values(%d, 
%d, %d)", key, key, key)));
+    }
+
+    /** */
+    @Test
+    public void testObsolescenceWithUpdate() throws Exception {
+        doTestObsolescenceUnderLoad(true, 0, key -> sql("update SMALL set 
B=B+1 where A=" + key));
+    }
+
+    /** */
+    @Test
+    public void testObsolescenceWithDelete() throws Exception {
+        doTestObsolescenceUnderLoad(true, -1, key -> sql("delete from SMALL 
where A=" + key));
+    }
+
+    /** */
+    private void doTestObsolescenceUnderLoad(boolean preload, int rowCntCmp, 
Consumer<Long> op) throws Exception {
+        // Keep enough data to touch every partition. The statistics 
collection is sensitive to a partition's empty rows num
+        // and is able to reassemble in this case. This would give 
false-positive result.
+        int workingRowsNum = RendezvousAffinityFunction.DFLT_PARTITION_COUNT * 
10;
+        int preloadCnt = preload ? workingRowsNum : 0;
+
+        int osbInterval = 7;
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        try {
+            startGridsMultiThreaded(2);
+
+            for (Ignite ig : G.allGrids())
+                
((IgniteStatisticsManagerImpl)((IgniteEx)ig).context().query().statsManager()).scheduleObsolescence(osbInterval);
+
+            createSmallTable(preloadCnt, null);
+
+            statisticsMgr(0).usageState(StatisticsUsageState.ON);
+            
statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET));
+
+            // Initialized statistics.
+            assertTrue(waitForCondition(() -> 
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000));
+            assertTrue(waitForCondition(() -> 
statisticsMgr(1).getLocalStatistics(SMALL_KEY) != null, osbInterval * 1000));
+
+            ObjectStatisticsImpl initStat1 = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+            ObjectStatisticsImpl initStat2 = 
(ObjectStatisticsImpl)statisticsMgr(1).getLocalStatistics(SMALL_KEY);
+
+            assertEquals(preloadCnt, initStat1.rowCount() + 
initStat2.rowCount());
+
+            GridTestUtils.runAsync(() -> {
+                AtomicLong key = new AtomicLong(1L);
+
+                long opCnt = 0;
+
+                while (!barrier.isBroken()) {
+                    op.accept(key.getAndIncrement());
+
+                    // Enough updates to trigger the statistics.
+                    if (++opCnt == workingRowsNum / 3) {
+                        opCnt = 0;
+
+                        barrier.await();
+                        barrier.await();
+                    }
+                }
+            });
+
+            barrier.await();
+
+            waitForStatsUpdates(initStat1, osbInterval * 2);
+
+            ObjectStatisticsImpl updatedStat = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+            assertTrue(rowCntCmp > 0 ? updatedStat.rowCount() > 
initStat1.rowCount() :
+                (rowCntCmp < 0 ? updatedStat.rowCount() < initStat1.rowCount() 
: updatedStat.rowCount() == initStat1.rowCount()));
+
+            barrier.await();
+            barrier.await();
+
+            // Continuing data loading, the table is being updated. Since the 
row count is inreasing, we must obtain a
+            // new statistics, greather than {@code firstNotEmpty}.
+            waitForStatsUpdates(updatedStat, osbInterval * 2);
+
+            ObjectStatisticsImpl finalStat = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+            assertTrue(rowCntCmp > 0 ? finalStat.rowCount() > 
updatedStat.rowCount() :
+                (rowCntCmp < 0 ? finalStat.rowCount() < updatedStat.rowCount() 
: finalStat.rowCount() == updatedStat.rowCount()));
+        }
+        finally {
+            barrier.reset();
+        }
+    }
+
+    /** */
+    private void waitForStatsUpdates(ObjectStatisticsImpl compareTo, long 
timeoutSec) throws IgniteInterruptedCheckedException {
+        assertTrue(waitForCondition(() -> {
+            ObjectStatisticsImpl updatedStat = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
+
+            if (updatedStat == null)
+                return false;
+
+            AtomicBoolean passed = new AtomicBoolean(true);
+
+            updatedStat.columnsStatistics().forEach((col, stat) -> {
+                ColumnStatistics compared = compareTo.columnStatistics(col);
+
+                assert compared != null;
+
+                if (compared.createdAt() >= stat.createdAt())
+                    passed.set(false);
+            });
+
+            return passed.get();
+        }, timeoutSec * 1000));
+    }
+
     /**
      * Test statistics refreshing after significant changes of base table:
      * 1) Create and populate small table
@@ -51,7 +176,7 @@ public class StatisticsObsolescenceTest extends 
StatisticsAbstractTest {
 
         
statisticsMgr(0).collectStatistics(buildDefaultConfigurations(SMALL_TARGET));
 
-        assertTrue(GridTestUtils.waitForCondition(() -> 
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT));
+        assertTrue(waitForCondition(() -> 
statisticsMgr(0).getLocalStatistics(SMALL_KEY) != null, TIMEOUT));
 
         ObjectStatisticsImpl stat1 = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
 
@@ -62,7 +187,7 @@ public class StatisticsObsolescenceTest extends 
StatisticsAbstractTest {
 
         statisticsMgr(0).processObsolescence();
 
-        assertTrue(GridTestUtils.waitForCondition(() -> {
+        assertTrue(waitForCondition(() -> {
             ObjectStatisticsImpl stat2 = 
(ObjectStatisticsImpl)statisticsMgr(0).getLocalStatistics(SMALL_KEY);
 
             return stat2 != null && stat2.rowCount() > stat1.rowCount();
@@ -103,7 +228,7 @@ public class StatisticsObsolescenceTest extends 
StatisticsAbstractTest {
 
         ignite.cluster().state(ClusterState.ACTIVE);
 
-        assertTrue(GridTestUtils.waitForCondition(() -> 
statObs.get(SMALL_KEY).size() > oldSize, TIMEOUT));
+        assertTrue(waitForCondition(() -> statObs.get(SMALL_KEY).size() > 
oldSize, TIMEOUT));
     }
 
     /** {@inheritDoc} */

Reply via email to