This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e5e375b36 IGNITE-21031 SQL Calcite: Fix performance statistics 
failure on nested scans - Fixes #11081.
98e5e375b36 is described below

commit 98e5e375b36c8976da96d8546652b1fc39e71a74
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Dec 7 12:55:22 2023 +0300

    IGNITE-21031 SQL Calcite: Fix performance statistics failure on nested 
scans - Fixes #11081.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/exec/rel/ScanStorageNode.java    |  7 ++-
 .../query/calcite/exec/tracker/IoTracker.java      |  8 ++-
 .../query/calcite/exec/tracker/NoOpIoTracker.java  |  4 +-
 .../tracker/PerformanceStatisticsIoTracker.java    | 15 ++++-
 .../integration/SqlDiagnosticIntegrationTest.java  | 66 ++++++++++++++++++++++
 5 files changed, 91 insertions(+), 9 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
index 48c98dbd2b4..2608b8716f4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
@@ -64,9 +64,9 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
 
     /** {@inheritDoc} */
     @Override protected int processNextBatch() throws Exception {
-        try {
-            context().ioTracker().startTracking();
+        boolean trackingStarted = context().ioTracker().startTracking();
 
+        try {
             int processed = super.processNextBatch();
 
             if (processedRowsCntr != null)
@@ -75,7 +75,8 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
             return processed;
         }
         finally {
-            context().ioTracker().stopTracking();
+            if (trackingStarted)
+                context().ioTracker().stopTracking();
         }
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
index a246cb175b2..9a3b1b9aa06 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
@@ -24,8 +24,12 @@ import org.jetbrains.annotations.Nullable;
  * I/O operations tracker interface.
  */
 public interface IoTracker {
-    /** Start tracking of I/O operations performed by current thread. */
-    public void startTracking();
+    /**
+     * Start tracking of I/O operations performed by current thread.
+     *
+     * @return {@code True} if tracking is started and wasn't started before.
+     */
+    public boolean startTracking();
 
     /** Stop tracking and save result. */
     public void stopTracking();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
index 4cc26d1967b..72fda5f5a7b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
@@ -28,8 +28,8 @@ public class NoOpIoTracker implements IoTracker {
     public static final IoTracker INSTANCE = new NoOpIoTracker();
 
     /** {@inheritDoc} */
-    @Override public void startTracking() {
-        // No-op.
+    @Override public boolean startTracking() {
+        return false;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
index 2978e19cf28..f5c130070a4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec.tracker;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
@@ -46,6 +47,9 @@ public class PerformanceStatisticsIoTracker implements 
IoTracker {
     /** */
     private final AtomicLong physicalReads = new AtomicLong();
 
+    /** */
+    private final AtomicBoolean started = new AtomicBoolean();
+
     /** */
     private final List<T2<String, AtomicLong>> cntrs = new 
CopyOnWriteArrayList<>();
 
@@ -61,8 +65,13 @@ public class PerformanceStatisticsIoTracker implements 
IoTracker {
     }
 
     /** {@inheritDoc} */
-    @Override public void startTracking() {
-        IoStatisticsQueryHelper.startGatheringQueryStatistics();
+    @Override public boolean startTracking() {
+        if (started.compareAndSet(false, true)) {
+            IoStatisticsQueryHelper.startGatheringQueryStatistics();
+            return true;
+        }
+        else
+            return false;
     }
 
     /** {@inheritDoc} */
@@ -71,6 +80,8 @@ public class PerformanceStatisticsIoTracker implements 
IoTracker {
 
         logicalReads.addAndGet(stat.logicalReads());
         physicalReads.addAndGet(stat.physicalReads());
+
+        started.compareAndSet(true, false);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 2274741bbc9..70c3af0e449 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -490,6 +490,72 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         assertTrue(hasPlan.get());
     }
 
+    /** */
+    @Test
+    public void testPerformanceStatisticsNestedScan() throws Exception {
+        sql(grid(0), "CREATE TABLE test_perf_stat_nested (a INT) WITH 
template=REPLICATED");
+        sql(grid(0), "INSERT INTO test_perf_stat_nested VALUES (0), (1), (2), 
(3), (4)");
+
+        cleanPerformanceStatisticsDir();
+        startCollectStatistics();
+
+        AtomicInteger finishQryCnt = new AtomicInteger();
+        
grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q 
-> finishQryCnt.incrementAndGet());
+
+        sql(grid(0), "SELECT * FROM test_perf_stat_nested UNION ALL SELECT * 
FROM test_perf_stat_nested");
+
+        assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() == 
1, 1_000L));
+
+        AtomicInteger qryCnt = new AtomicInteger();
+        AtomicInteger readsCnt = new AtomicInteger();
+        AtomicLong rowsCnt = new AtomicLong();
+
+        stopCollectStatisticsAndRead(new 
AbstractPerformanceStatisticsTest.TestHandler() {
+            @Override public void query(
+                UUID nodeId,
+                GridCacheQueryType type,
+                String text,
+                long id,
+                long qryStartTime,
+                long duration,
+                boolean success
+            ) {
+                qryCnt.incrementAndGet();
+                assertTrue(success);
+            }
+
+            @Override public void queryReads(
+                UUID nodeId,
+                GridCacheQueryType type,
+                UUID qryNodeId,
+                long id,
+                long logicalReads,
+                long physicalReads
+            ) {
+                readsCnt.incrementAndGet();
+                assertTrue(logicalReads > 0);
+            }
+
+            @Override public void queryRows(
+                UUID nodeId,
+                GridCacheQueryType type,
+                UUID qryNodeId,
+                long id,
+                String action,
+                long rows
+            ) {
+                if ("Fetched".equals(action))
+                    rowsCnt.addAndGet(rows);
+            }
+        });
+
+        assertEquals(1, qryCnt.get());
+        // The second scan is executed inside the first scan 
processNextBatch() method,
+        // after the first scan invoke downstream().end(), so here we have 
only one read record.
+        assertEquals(1, readsCnt.get());
+        assertEquals(10, rowsCnt.get());
+    }
+
     /** */
     @Test
     public void testSqlEvents() {

Reply via email to