This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 98e5e375b36 IGNITE-21031 SQL Calcite: Fix performance statistics
failure on nested scans - Fixes #11081.
98e5e375b36 is described below
commit 98e5e375b36c8976da96d8546652b1fc39e71a74
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Dec 7 12:55:22 2023 +0300
IGNITE-21031 SQL Calcite: Fix performance statistics failure on nested
scans - Fixes #11081.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/exec/rel/ScanStorageNode.java | 7 ++-
.../query/calcite/exec/tracker/IoTracker.java | 8 ++-
.../query/calcite/exec/tracker/NoOpIoTracker.java | 4 +-
.../tracker/PerformanceStatisticsIoTracker.java | 15 ++++-
.../integration/SqlDiagnosticIntegrationTest.java | 66 ++++++++++++++++++++++
5 files changed, 91 insertions(+), 9 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
index 48c98dbd2b4..2608b8716f4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
@@ -64,9 +64,9 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
/** {@inheritDoc} */
@Override protected int processNextBatch() throws Exception {
- try {
- context().ioTracker().startTracking();
+ boolean trackingStarted = context().ioTracker().startTracking();
+ try {
int processed = super.processNextBatch();
if (processedRowsCntr != null)
@@ -75,7 +75,8 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
return processed;
}
finally {
- context().ioTracker().stopTracking();
+ if (trackingStarted)
+ context().ioTracker().stopTracking();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
index a246cb175b2..9a3b1b9aa06 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
@@ -24,8 +24,12 @@ import org.jetbrains.annotations.Nullable;
* I/O operations tracker interface.
*/
public interface IoTracker {
- /** Start tracking of I/O operations performed by current thread. */
- public void startTracking();
+ /**
+ * Start tracking of I/O operations performed by current thread.
+ *
+ * @return {@code True} if tracking is started and wasn't started before.
+ */
+ public boolean startTracking();
/** Stop tracking and save result. */
public void stopTracking();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
index 4cc26d1967b..72fda5f5a7b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
@@ -28,8 +28,8 @@ public class NoOpIoTracker implements IoTracker {
public static final IoTracker INSTANCE = new NoOpIoTracker();
/** {@inheritDoc} */
- @Override public void startTracking() {
- // No-op.
+ @Override public boolean startTracking() {
+ return false;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
index 2978e19cf28..f5c130070a4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.exec.tracker;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
@@ -46,6 +47,9 @@ public class PerformanceStatisticsIoTracker implements
IoTracker {
/** */
private final AtomicLong physicalReads = new AtomicLong();
+ /** */
+ private final AtomicBoolean started = new AtomicBoolean();
+
/** */
private final List<T2<String, AtomicLong>> cntrs = new
CopyOnWriteArrayList<>();
@@ -61,8 +65,13 @@ public class PerformanceStatisticsIoTracker implements
IoTracker {
}
/** {@inheritDoc} */
- @Override public void startTracking() {
- IoStatisticsQueryHelper.startGatheringQueryStatistics();
+ @Override public boolean startTracking() {
+ if (started.compareAndSet(false, true)) {
+ IoStatisticsQueryHelper.startGatheringQueryStatistics();
+ return true;
+ }
+ else
+ return false;
}
/** {@inheritDoc} */
@@ -71,6 +80,8 @@ public class PerformanceStatisticsIoTracker implements
IoTracker {
logicalReads.addAndGet(stat.logicalReads());
physicalReads.addAndGet(stat.physicalReads());
+
+ started.compareAndSet(true, false);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 2274741bbc9..70c3af0e449 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -490,6 +490,72 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
assertTrue(hasPlan.get());
}
+ /** */
+ @Test
+ public void testPerformanceStatisticsNestedScan() throws Exception {
+ sql(grid(0), "CREATE TABLE test_perf_stat_nested (a INT) WITH
template=REPLICATED");
+ sql(grid(0), "INSERT INTO test_perf_stat_nested VALUES (0), (1), (2),
(3), (4)");
+
+ cleanPerformanceStatisticsDir();
+ startCollectStatistics();
+
+ AtomicInteger finishQryCnt = new AtomicInteger();
+
grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q
-> finishQryCnt.incrementAndGet());
+
+ sql(grid(0), "SELECT * FROM test_perf_stat_nested UNION ALL SELECT *
FROM test_perf_stat_nested");
+
+ assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() ==
1, 1_000L));
+
+ AtomicInteger qryCnt = new AtomicInteger();
+ AtomicInteger readsCnt = new AtomicInteger();
+ AtomicLong rowsCnt = new AtomicLong();
+
+ stopCollectStatisticsAndRead(new
AbstractPerformanceStatisticsTest.TestHandler() {
+ @Override public void query(
+ UUID nodeId,
+ GridCacheQueryType type,
+ String text,
+ long id,
+ long qryStartTime,
+ long duration,
+ boolean success
+ ) {
+ qryCnt.incrementAndGet();
+ assertTrue(success);
+ }
+
+ @Override public void queryReads(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ long logicalReads,
+ long physicalReads
+ ) {
+ readsCnt.incrementAndGet();
+ assertTrue(logicalReads > 0);
+ }
+
+ @Override public void queryRows(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String action,
+ long rows
+ ) {
+ if ("Fetched".equals(action))
+ rowsCnt.addAndGet(rows);
+ }
+ });
+
+ assertEquals(1, qryCnt.get());
+ // The second scan is executed inside the first scan
processNextBatch() method,
+ // after the first scan invoke downstream().end(), so here we have
only one read record.
+ assertEquals(1, readsCnt.get());
+ assertEquals(10, rowsCnt.get());
+ }
+
/** */
@Test
public void testSqlEvents() {