This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 864b94edf76 Add extra info for TableScanOperator and 
AggTableScanOperator in Explain Analyze
864b94edf76 is described below

commit 864b94edf763d23934de586e26932e97751235b4
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Nov 27 16:30:48 2024 +0800

    Add extra info for TableScanOperator and AggTableScanOperator in Explain 
Analyze
---
 .../execution/operator/OperatorContext.java        |  4 +-
 .../TableAggregationTableScanOperator.java         | 45 +++++++++++++---------
 .../source/relational/TableScanOperator.java       |  7 ++++
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  6 ++-
 4 files changed, 41 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 49be9ad1cc0..99eccdcef3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.utils.RamUsageEstimator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -165,7 +166,8 @@ public class OperatorContext implements Accountable {
 
   public void recordSpecifiedInfo(String key, String value) {
     if (specifiedInfo == null) {
-      specifiedInfo = new HashMap<>();
+      // explain analyze operator fetching and current operator updating may 
be concurrently
+      specifiedInfo = new ConcurrentHashMap<>();
     }
     specifiedInfo.put(key, value);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index d444ca643ba..e963963bf7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -67,7 +67,9 @@ import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 
 public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregationScanOperator {
@@ -161,6 +163,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.columnsIndexArray = columnsIndexArray;
     this.deviceEntries = deviceEntries;
     this.deviceCount = deviceEntries.size();
+    this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
     this.scanOrder = scanOrder;
     this.seriesScanOptions = seriesScanOptions;
     this.measurementColumnNames = measurementColumnNames;
@@ -169,6 +172,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
     this.currentDeviceIndex = 0;
+    this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
     this.aggArguments = aggArguments;
     this.timeIterator = tableTimeRangeIterator;
     if (tableTimeRangeIterator.getType()
@@ -327,7 +331,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
         // all data of current device has been consumed
         updateResultTsBlock();
         timeIterator.resetCurTimeRange();
-        currentDeviceIndex++;
+        nextDevice();
       }
 
       if (currentDeviceIndex < deviceCount) {
@@ -800,29 +804,34 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   }
 
   private void checkIfAllAggregatorHasFinalResult() {
-    if (allAggregatorsHasFinalResult) {
-      // for SINGLE_TIME_ITERATOR, if allAggregatorsHasFinalResult, just 
consume next device
-      if (timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
-        currentDeviceIndex++;
-        inputTsBlock = null;
-
-        if (currentDeviceIndex < deviceCount) {
-          // construct AlignedSeriesScanUtil for next device
-          constructAlignedSeriesScanUtil();
-          queryDataSource.reset();
-          this.seriesScanUtil.initQueryDataSource(queryDataSource);
-        }
+    if (allAggregatorsHasFinalResult
+        && timeIterator.getType()
+            == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
+      nextDevice();
+      inputTsBlock = null;
 
-        if (currentDeviceIndex >= deviceCount) {
-          // all devices have been consumed
-          timeIterator.setFinished();
-        }
+      if (currentDeviceIndex < deviceCount) {
+        // construct AlignedSeriesScanUtil for next device
+        constructAlignedSeriesScanUtil();
+        queryDataSource.reset();
+        this.seriesScanUtil.initQueryDataSource(queryDataSource);
+      }
 
-        allAggregatorsHasFinalResult = false;
+      if (currentDeviceIndex >= deviceCount) {
+        // all devices have been consumed
+        timeIterator.setFinished();
       }
+
+      allAggregatorsHasFinalResult = false;
     }
   }
 
+  private void nextDevice() {
+    currentDeviceIndex++;
+    this.operatorContext.recordSpecifiedInfo(
+        CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
+  }
+
   private void resetTableAggregators() {
     tableAggregators.forEach(TableAggregator::reset);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index 27b9244e80b..d1133b1f17a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -54,12 +54,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 
 public class TableScanOperator extends AbstractSeriesScanOperator {
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
 
+  public static final String CURRENT_DEVICE_INDEX_STRING = 
"CurrentDeviceIndex";
+
   public static final LongColumn TIME_COLUMN_TEMPLATE =
       new LongColumn(1, Optional.empty(), new long[] {0});
 
@@ -106,6 +109,7 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
       int maxTsBlockLineNum) {
     this.sourceId = sourceId;
     this.operatorContext = context;
+    this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(deviceEntries.size()));
     this.columnSchemas = columnSchemas;
     this.columnsIndexArray = columnsIndexArray;
     this.deviceEntries = deviceEntries;
@@ -118,6 +122,7 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
     this.currentDeviceIndex = 0;
+    this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
 
     this.maxReturnSize =
         Math.min(
@@ -297,6 +302,8 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
       // reset QueryDataSource
       queryDataSource.reset();
       this.seriesScanUtil.initQueryDataSource(queryDataSource);
+      this.operatorContext.recordSpecifiedInfo(
+          CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index d33f0b24481..4eac2da8a8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -102,6 +102,8 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
   private static final int BOX_MARGIN = 1;
   private static final int CONNECTION_LINE_HEIGHT = 2;
 
+  public static final String DEVICE_NUMBER = "DeviceNumber";
+
   @Override
   public List<String> visitPlan(PlanNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
@@ -616,7 +618,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     boxValue.add(String.format("TableScan-%s", node.getPlanNodeId().getId()));
     boxValue.add(String.format("QualifiedTableName: %s", 
node.getQualifiedObjectName().toString()));
     boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
-    boxValue.add(String.format("DeviceEntriesSize: %s", 
node.getDeviceEntries().size()));
+    boxValue.add(String.format("DeviceNumber: %s", 
node.getDeviceEntries().size()));
     boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
     if (node.getTimePredicate().isPresent()) {
       boxValue.add(String.format("TimePredicate: %s", 
node.getTimePredicate().get()));
@@ -684,7 +686,7 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
           String.format("Project-Expressions: %s", 
node.getProjection().getMap().values()));
     }
 
-    boxValue.add(String.format("DeviceEntriesSize: %s", 
node.getDeviceEntries().size()));
+    boxValue.add(String.format("DeviceNumber: %s", 
node.getDeviceEntries().size()));
     boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
     if (node.getPushDownPredicate() != null) {
       boxValue.add(String.format("PushDownPredicate: %s", 
node.getPushDownPredicate()));

Reply via email to