This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TableScanExtraInfo in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d6b2f7f9bcf201dffdcd12901847260030e4e0bf Author: JackieTien97 <[email protected]> AuthorDate: Wed Nov 27 15:25:56 2024 +0800 Add extra info for TableScanOperator and AggTableScanOperator in Explain Analyze --- .../execution/operator/OperatorContext.java | 4 +- .../TableAggregationTableScanOperator.java | 45 +++++++++++++--------- .../source/relational/TableScanOperator.java | 7 ++++ .../plan/planner/plan/node/PlanGraphPrinter.java | 6 ++- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 49be9ad1cc0..99eccdcef3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -34,6 +34,7 @@ import org.apache.tsfile.utils.RamUsageEstimator; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -165,7 +166,8 @@ public class OperatorContext implements Accountable { public void recordSpecifiedInfo(String key, String value) { if (specifiedInfo == null) { - specifiedInfo = new HashMap<>(); + // explain analyze operator fetching and current operator updating may be concurrently + specifiedInfo = new ConcurrentHashMap<>(); } specifiedInfo.put(key, value); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index d444ca643ba..e963963bf7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -67,7 +67,9 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator { @@ -161,6 +163,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation this.columnsIndexArray = columnsIndexArray; this.deviceEntries = deviceEntries; this.deviceCount = deviceEntries.size(); + this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); this.scanOrder = scanOrder; this.seriesScanOptions = seriesScanOptions; this.measurementColumnNames = measurementColumnNames; @@ -169,6 +172,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation this.measurementColumnTSDataTypes = measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; + this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); this.aggArguments = aggArguments; this.timeIterator = tableTimeRangeIterator; if (tableTimeRangeIterator.getType() @@ -327,7 +331,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation // all data of current device has been consumed updateResultTsBlock(); timeIterator.resetCurTimeRange(); - currentDeviceIndex++; + nextDevice(); } if (currentDeviceIndex < deviceCount) { @@ -800,29 +804,34 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } private void checkIfAllAggregatorHasFinalResult() { - if (allAggregatorsHasFinalResult) { - // for SINGLE_TIME_ITERATOR, if allAggregatorsHasFinalResult, just consume next device - if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { - currentDeviceIndex++; - inputTsBlock = null; - - if (currentDeviceIndex < deviceCount) { - // construct AlignedSeriesScanUtil for next device - constructAlignedSeriesScanUtil(); - queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); - } + if (allAggregatorsHasFinalResult + && timeIterator.getType() + == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + nextDevice(); + inputTsBlock = null; - if (currentDeviceIndex >= deviceCount) { - // all devices have been consumed - timeIterator.setFinished(); - } + if (currentDeviceIndex < deviceCount) { + // construct AlignedSeriesScanUtil for next device + constructAlignedSeriesScanUtil(); + queryDataSource.reset(); + this.seriesScanUtil.initQueryDataSource(queryDataSource); + } - allAggregatorsHasFinalResult = false; + if (currentDeviceIndex >= deviceCount) { + // all devices have been consumed + timeIterator.setFinished(); } + + allAggregatorsHasFinalResult = false; } } + private void nextDevice() { + currentDeviceIndex++; + this.operatorContext.recordSpecifiedInfo( + CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + } + private void resetTableAggregators() { tableAggregators.forEach(TableAggregator::reset); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java index 27b9244e80b..d1133b1f17a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -54,12 +54,15 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder; +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; public class TableScanOperator extends AbstractSeriesScanOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class); + public static final String CURRENT_DEVICE_INDEX_STRING = "CurrentDeviceIndex"; + public static final LongColumn TIME_COLUMN_TEMPLATE = new LongColumn(1, Optional.empty(), new long[] {0}); @@ -106,6 +109,7 @@ public class TableScanOperator extends AbstractSeriesScanOperator { int maxTsBlockLineNum) { this.sourceId = sourceId; this.operatorContext = context; + this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(deviceEntries.size())); this.columnSchemas = columnSchemas; this.columnsIndexArray = columnsIndexArray; this.deviceEntries = deviceEntries; @@ -118,6 +122,7 @@ public class TableScanOperator extends AbstractSeriesScanOperator { this.measurementColumnTSDataTypes = measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; + this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); this.maxReturnSize = Math.min( @@ -297,6 +302,8 @@ public class TableScanOperator extends AbstractSeriesScanOperator { // reset QueryDataSource queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.operatorContext.recordSpecifiedInfo( + CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index d33f0b24481..4eac2da8a8a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -102,6 +102,8 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter private static final int BOX_MARGIN = 1; private static final int CONNECTION_LINE_HEIGHT = 2; + public static final String DEVICE_NUMBER = "DeviceNumber"; + @Override public List<String> visitPlan(PlanNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); @@ -616,7 +618,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter boxValue.add(String.format("TableScan-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("QualifiedTableName: %s", node.getQualifiedObjectName().toString())); boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); - boxValue.add(String.format("DeviceEntriesSize: %s", node.getDeviceEntries().size())); + boxValue.add(String.format("DeviceNumber: %s", node.getDeviceEntries().size())); boxValue.add(String.format("ScanOrder: %s", node.getScanOrder())); if (node.getTimePredicate().isPresent()) { boxValue.add(String.format("TimePredicate: %s", node.getTimePredicate().get())); @@ -684,7 +686,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter String.format("Project-Expressions: %s", node.getProjection().getMap().values())); } - boxValue.add(String.format("DeviceEntriesSize: %s", node.getDeviceEntries().size())); + boxValue.add(String.format("DeviceNumber: %s", node.getDeviceEntries().size())); boxValue.add(String.format("ScanOrder: %s", node.getScanOrder())); if (node.getPushDownPredicate() != null) { boxValue.add(String.format("PushDownPredicate: %s", node.getPushDownPredicate()));
