This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 864b94edf76 Add extra info for TableScanOperator and
AggTableScanOperator in Explain Analyze
864b94edf76 is described below
commit 864b94edf763d23934de586e26932e97751235b4
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Nov 27 16:30:48 2024 +0800
Add extra info for TableScanOperator and AggTableScanOperator in Explain
Analyze
---
.../execution/operator/OperatorContext.java | 4 +-
.../TableAggregationTableScanOperator.java | 45 +++++++++++++---------
.../source/relational/TableScanOperator.java | 7 ++++
.../plan/planner/plan/node/PlanGraphPrinter.java | 6 ++-
4 files changed, 41 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 49be9ad1cc0..99eccdcef3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -34,6 +34,7 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
@@ -165,7 +166,8 @@ public class OperatorContext implements Accountable {
public void recordSpecifiedInfo(String key, String value) {
if (specifiedInfo == null) {
- specifiedInfo = new HashMap<>();
+ // explain analyze operator fetching and current operator updating may
be concurrently
+ specifiedInfo = new ConcurrentHashMap<>();
}
specifiedInfo.put(key, value);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index d444ca643ba..e963963bf7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -67,7 +67,9 @@ import java.util.stream.Collectors;
import static java.lang.String.format;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
import static
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
public class TableAggregationTableScanOperator extends
AbstractSeriesAggregationScanOperator {
@@ -161,6 +163,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
this.columnsIndexArray = columnsIndexArray;
this.deviceEntries = deviceEntries;
this.deviceCount = deviceEntries.size();
+ this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER,
Integer.toString(this.deviceCount));
this.scanOrder = scanOrder;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
@@ -169,6 +172,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
this.currentDeviceIndex = 0;
+ this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING,
Integer.toString(0));
this.aggArguments = aggArguments;
this.timeIterator = tableTimeRangeIterator;
if (tableTimeRangeIterator.getType()
@@ -327,7 +331,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
// all data of current device has been consumed
updateResultTsBlock();
timeIterator.resetCurTimeRange();
- currentDeviceIndex++;
+ nextDevice();
}
if (currentDeviceIndex < deviceCount) {
@@ -800,29 +804,34 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
private void checkIfAllAggregatorHasFinalResult() {
- if (allAggregatorsHasFinalResult) {
- // for SINGLE_TIME_ITERATOR, if allAggregatorsHasFinalResult, just
consume next device
- if (timeIterator.getType() ==
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
- currentDeviceIndex++;
- inputTsBlock = null;
-
- if (currentDeviceIndex < deviceCount) {
- // construct AlignedSeriesScanUtil for next device
- constructAlignedSeriesScanUtil();
- queryDataSource.reset();
- this.seriesScanUtil.initQueryDataSource(queryDataSource);
- }
+ if (allAggregatorsHasFinalResult
+ && timeIterator.getType()
+ == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
+ nextDevice();
+ inputTsBlock = null;
- if (currentDeviceIndex >= deviceCount) {
- // all devices have been consumed
- timeIterator.setFinished();
- }
+ if (currentDeviceIndex < deviceCount) {
+ // construct AlignedSeriesScanUtil for next device
+ constructAlignedSeriesScanUtil();
+ queryDataSource.reset();
+ this.seriesScanUtil.initQueryDataSource(queryDataSource);
+ }
- allAggregatorsHasFinalResult = false;
+ if (currentDeviceIndex >= deviceCount) {
+ // all devices have been consumed
+ timeIterator.setFinished();
}
+
+ allAggregatorsHasFinalResult = false;
}
}
+ private void nextDevice() {
+ currentDeviceIndex++;
+ this.operatorContext.recordSpecifiedInfo(
+ CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
+ }
+
private void resetTableAggregators() {
tableAggregators.forEach(TableAggregator::reset);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index 27b9244e80b..d1133b1f17a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -54,12 +54,15 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
public class TableScanOperator extends AbstractSeriesScanOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
+ public static final String CURRENT_DEVICE_INDEX_STRING =
"CurrentDeviceIndex";
+
public static final LongColumn TIME_COLUMN_TEMPLATE =
new LongColumn(1, Optional.empty(), new long[] {0});
@@ -106,6 +109,7 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
int maxTsBlockLineNum) {
this.sourceId = sourceId;
this.operatorContext = context;
+ this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER,
Integer.toString(deviceEntries.size()));
this.columnSchemas = columnSchemas;
this.columnsIndexArray = columnsIndexArray;
this.deviceEntries = deviceEntries;
@@ -118,6 +122,7 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
this.currentDeviceIndex = 0;
+ this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING,
Integer.toString(0));
this.maxReturnSize =
Math.min(
@@ -297,6 +302,8 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
// reset QueryDataSource
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
+ this.operatorContext.recordSpecifiedInfo(
+ CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index d33f0b24481..4eac2da8a8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -102,6 +102,8 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
private static final int BOX_MARGIN = 1;
private static final int CONNECTION_LINE_HEIGHT = 2;
+ public static final String DEVICE_NUMBER = "DeviceNumber";
+
@Override
public List<String> visitPlan(PlanNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
@@ -616,7 +618,7 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
boxValue.add(String.format("TableScan-%s", node.getPlanNodeId().getId()));
boxValue.add(String.format("QualifiedTableName: %s",
node.getQualifiedObjectName().toString()));
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
- boxValue.add(String.format("DeviceEntriesSize: %s",
node.getDeviceEntries().size()));
+ boxValue.add(String.format("DeviceNumber: %s",
node.getDeviceEntries().size()));
boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
if (node.getTimePredicate().isPresent()) {
boxValue.add(String.format("TimePredicate: %s",
node.getTimePredicate().get()));
@@ -684,7 +686,7 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
String.format("Project-Expressions: %s",
node.getProjection().getMap().values()));
}
- boxValue.add(String.format("DeviceEntriesSize: %s",
node.getDeviceEntries().size()));
+ boxValue.add(String.format("DeviceNumber: %s",
node.getDeviceEntries().size()));
boxValue.add(String.format("ScanOrder: %s", node.getScanOrder()));
if (node.getPushDownPredicate() != null) {
boxValue.add(String.format("PushDownPredicate: %s",
node.getPushDownPredicate()));