This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c6e110e28f [IOTDB-6337] Refine the count calculation in RegionScan
framework
4c6e110e28f is described below
commit 4c6e110e28fce314ba23d107fe45ae96b6862024
Author: YangCaiyin <[email protected]>
AuthorDate: Thu Jun 6 11:00:07 2024 +0800
[IOTDB-6337] Refine the count calculation in RegionScan framework
---
.../db/it/regionscan/IoTDBActiveRegionScanIT.java | 23 ++++++++++++-
.../process/ActiveRegionScanMergeOperator.java | 37 +++++++++++++-------
.../AbstractRegionScanDataSourceOperator.java | 19 ++++++++--
.../source/ActiveDeviceRegionScanOperator.java | 40 ++++++++++++++--------
.../source/ActiveTimeSeriesRegionScanOperator.java | 34 +++++++++++++++---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 23 +++++--------
.../plan/planner/OperatorTreeGenerator.java | 8 +++--
.../plan/planner/distribution/SourceRewriter.java | 1 +
8 files changed, 133 insertions(+), 52 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
index bfbba1b0f6b..c608bf3e24c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
@@ -285,6 +285,28 @@ public class IoTDBActiveRegionScanIT {
basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
}
+ @Test
+ public void showActiveDeviceEmptyTest() {
+ String sql = "show devices root.empty where time < 50";
+ String[] retArray = new String[] {};
+ basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
+
+ sql = "count devices root.empty where time < 50";
+ long value = 0;
+ basicCountActiveDeviceTest(sql, COUNT_DEVICES_COLUMN_NAMES, value);
+ }
+
+ @Test
+ public void showActiveTimeseriesEmptyTest() {
+ String sql = "show timeseries root.empty where time < 50";
+ String[] retArray = new String[] {};
+ basicShowActiveDeviceTest(sql, SHOW_TIMESERIES_COLUMN_NAMES, retArray);
+
+ sql = "count timeseries root.empty where time < 50";
+ long value = 0;
+ basicCountActiveDeviceTest(sql, COUNT_TIMESERIES_COLUMN_NAMES, value);
+ }
+
@Test
public void showActiveTimeseriesTest() {
String sql = "show timeseries where time = 4";
@@ -461,7 +483,6 @@ public class IoTDBActiveRegionScanIT {
try (ResultSet resultSet = statement.executeQuery(sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- Map<String, Integer> map = new HashMap<>();
assertEquals(1, resultSetMetaData.getColumnCount());
assertEquals(columnName, resultSetMetaData.getColumnName(1));
int cnt = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
index 8efeeecce42..1749b61b760 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
@@ -117,26 +117,37 @@ public class ActiveRegionScanMergeOperator extends
AbstractConsumeAllOperator {
}
}
- TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
- int curTsBlockRowIndex;
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (inputTsBlocks[i] == null) {
- continue;
+ if (!needMergeBeforeCount) {
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (inputTsBlocks[i] == null) {
+ continue;
+ }
+ for (int row = 0; row < maxRowCanBuild; row++) {
+ long childCount =
inputTsBlocks[i].getValueColumns()[0].getLong(inputIndex[i] + row);
+ count += childCount;
+ inputIndex[i] += maxRowCanBuild;
+ }
}
- curTsBlockRowIndex = inputIndex[i];
- for (int row = 0; row < maxRowCanBuild; row++) {
- String id =
- inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex
+ row).toString();
- if (!outputCount || needMergeBeforeCount) {
+ } else {
+ TimeColumnBuilder timeColumnBuilder =
tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ int curTsBlockRowIndex;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (inputTsBlocks[i] == null) {
+ continue;
+ }
+ curTsBlockRowIndex = inputIndex[i];
+ for (int row = 0; row < maxRowCanBuild; row++) {
+ String id =
+
inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex +
row).toString();
if (deduplicatedSet.contains(id)) {
continue;
}
deduplicatedSet.add(id);
+ buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder,
valueColumnBuilders);
}
- buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder,
valueColumnBuilders);
+ inputIndex[i] += maxRowCanBuild;
}
- inputIndex[i] += maxRowCanBuild;
}
return outputCount ? returnResultIfNoMoreData() : tsBlockBuilder.build();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
index b9cd4d442d7..0f20259d5dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
@@ -37,6 +37,9 @@ public abstract class AbstractRegionScanDataSourceOperator
extends AbstractSourc
protected boolean finished = false;
+ protected boolean outputCount;
+ protected long count = 0;
+
protected AbstractRegionScanForActiveDataUtil regionScanUtil;
protected TsBlockBuilder resultTsBlockBuilder;
@@ -97,16 +100,28 @@ public abstract class AbstractRegionScanDataSourceOperator
extends AbstractSourc
} while (System.nanoTime() - start < maxRuntime &&
!resultTsBlockBuilder.isFull());
finished =
- resultTsBlockBuilder.isEmpty()
+ (resultTsBlockBuilder.isEmpty())
&& ((!regionScanUtil.hasMoreData() &&
regionScanUtil.isCurrentTsFileFinished())
|| isAllDataChecked());
- return !finished;
+ boolean hasCachedCountValue = buildCountResultIfNeed();
+ return !finished || hasCachedCountValue;
} catch (IOException e) {
throw new IOException("Error occurs when scanning active time series.",
e);
}
}
+ private boolean buildCountResultIfNeed() {
+ if (!outputCount || !finished || count == -1) {
+ return false;
+ }
+ resultTsBlockBuilder.getTimeColumnBuilder().writeLong(-1);
+ resultTsBlockBuilder.getValueColumnBuilders()[0].writeLong(count);
+ resultTsBlockBuilder.declarePosition();
+ count = -1;
+ return true;
+ }
+
@Override
public void close() throws Exception {
// do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
index b2bdb66ba1c..873352b8b74 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
@@ -51,7 +51,9 @@ public class ActiveDeviceRegionScanOperator extends
AbstractRegionScanDataSource
OperatorContext operatorContext,
PlanNodeId sourceId,
Map<IDeviceID, Boolean> deviceToAlignedMap,
- Filter timeFilter) {
+ Filter timeFilter,
+ boolean outputCount) {
+ this.outputCount = outputCount;
this.sourceId = sourceId;
this.operatorContext = operatorContext;
this.deviceToAlignedMap = deviceToAlignedMap;
@@ -70,26 +72,36 @@ public class ActiveDeviceRegionScanOperator extends
AbstractRegionScanDataSource
@Override
protected void updateActiveData() {
- TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
-
List<IDeviceID> activeDevices =
((RegionScanForActiveDeviceUtil) regionScanUtil).getActiveDevices();
- for (IDeviceID deviceID : activeDevices) {
- timeColumnBuilder.writeLong(-1);
- columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
- columnBuilders[1].writeBinary(
- new Binary(
- String.valueOf(deviceToAlignedMap.get(deviceID)),
TSFileConfig.STRING_CHARSET));
- columnBuilders[2].appendNull();
- columnBuilders[3].appendNull();
- resultTsBlockBuilder.declarePosition();
- deviceToAlignedMap.remove(deviceID);
+
+ if (this.outputCount) {
+ count += activeDevices.size();
+ activeDevices.forEach(deviceToAlignedMap.keySet()::remove);
+ } else {
+ TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders =
resultTsBlockBuilder.getValueColumnBuilders();
+ for (IDeviceID deviceID : activeDevices) {
+ timeColumnBuilder.writeLong(-1);
+ columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
+ columnBuilders[1].writeBinary(
+ new Binary(
+ String.valueOf(deviceToAlignedMap.get(deviceID)),
TSFileConfig.STRING_CHARSET));
+ columnBuilders[2].appendNull();
+ columnBuilders[3].appendNull();
+ resultTsBlockBuilder.declarePosition();
+ deviceToAlignedMap.remove(deviceID);
+ }
}
}
@Override
protected List<TSDataType> getResultDataTypes() {
+ if (outputCount) {
+ return ColumnHeaderConstant.countDevicesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ }
return ColumnHeaderConstant.showDevicesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
index 34e082b27c5..e3385f92ecd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
@@ -54,7 +54,9 @@ public class ActiveTimeSeriesRegionScanOperator extends
AbstractRegionScanDataSo
OperatorContext operatorContext,
PlanNodeId sourceId,
Map<IDeviceID, Map<String, TimeseriesSchemaInfo>>
timeSeriesToSchemasInfo,
- Filter timeFilter) {
+ Filter timeFilter,
+ boolean isOutputCount) {
+ this.outputCount = isOutputCount;
this.operatorContext = operatorContext;
this.sourceId = sourceId;
this.timeSeriesToSchemasInfo = timeSeriesToSchemasInfo;
@@ -95,6 +97,16 @@ public class ActiveTimeSeriesRegionScanOperator extends
AbstractRegionScanDataSo
Map<IDeviceID, List<String>> activeTimeSeries =
((RegionScanForActiveTimeSeriesUtil)
regionScanUtil).getActiveTimeSeries();
+
+ if (outputCount) {
+ for (Map.Entry<IDeviceID, List<String>> entry :
activeTimeSeries.entrySet()) {
+ List<String> timeSeriesList = entry.getValue();
+ count += timeSeriesList.size();
+ removeTimeseriesListFromDevice(entry.getKey(), timeSeriesList);
+ }
+ return;
+ }
+
for (Map.Entry<IDeviceID, List<String>> entry :
activeTimeSeries.entrySet()) {
IDeviceID deviceID = entry.getKey();
String deviceStr = ((PlainDeviceID) deviceID).toStringID();
@@ -117,11 +129,18 @@ public class ActiveTimeSeriesRegionScanOperator extends
AbstractRegionScanDataSo
checkAndAppend(schemaInfo.getDeadbandParameters(), columnBuilders[9]);
// DeadbandParameters
columnBuilders[10].writeBinary(VIEW_TYPE); // ViewType
resultTsBlockBuilder.declarePosition();
- timeSeriesInfo.remove(timeSeries);
- }
- if (timeSeriesInfo.isEmpty()) {
- timeSeriesToSchemasInfo.remove(deviceID);
}
+ removeTimeseriesListFromDevice(deviceID, timeSeriesList);
+ }
+ }
+
+ private void removeTimeseriesListFromDevice(IDeviceID deviceID, List<String>
timeSeriesList) {
+ Map<String, TimeseriesSchemaInfo> timeSeriesInfo =
timeSeriesToSchemasInfo.get(deviceID);
+ for (String timeSeries : timeSeriesList) {
+ timeSeriesInfo.remove(timeSeries);
+ }
+ if (timeSeriesInfo.isEmpty()) {
+ timeSeriesToSchemasInfo.remove(deviceID);
}
}
@@ -131,6 +150,11 @@ public class ActiveTimeSeriesRegionScanOperator extends
AbstractRegionScanDataSo
@Override
protected List<TSDataType> getResultDataTypes() {
+ if (outputCount) {
+ return ColumnHeaderConstant.countTimeSeriesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ }
return ColumnHeaderConstant.showTimeSeriesColumnHeaders.stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 064685bcb6b..038514ee02f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2928,6 +2928,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeTimeseriesRegionScan(
showTimeSeriesStatement.getTimeCondition(), patternTree,
analysis, context);
if (!hasSchema) {
+
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader());
return analysis;
}
} catch (IllegalPathException e) {
@@ -3007,7 +3008,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
schemaTree.removeLogicalView();
}
- private boolean analyzeDeviceRegionScan(
+ private void analyzeDeviceRegionScan(
WhereCondition timeCondition,
PathPatternTree patternTree,
Analysis analysis,
@@ -3019,7 +3020,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
ISchemaTree schemaTree =
schemaFetcher.fetchSchemaInDeviceLevel(patternTree, context);
if (schemaTree.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
- return false;
+ return;
}
// fetch Data partition
@@ -3037,7 +3038,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
schemaTree,
context);
analysis.setDataPartitionInfo(dataPartition);
- return true;
}
@Override
@@ -3051,12 +3051,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
if (showDevicesStatement.hasTimeCondition()) {
- boolean hasSchema =
- analyzeDeviceRegionScan(
- showDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
- if (!hasSchema) {
- return analysis;
- }
+ analyzeDeviceRegionScan(
+ showDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
} else {
SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@@ -3117,12 +3113,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
patternTree.appendPathPattern(
countDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
if (countDevicesStatement.hasTimeCondition()) {
- boolean hasSchema =
- analyzeDeviceRegionScan(
- countDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
- if (!hasSchema) {
- return analysis;
- }
+ analyzeDeviceRegionScan(
+ countDevicesStatement.getTimeCondition(), patternTree, analysis,
context);
} else {
SchemaPartition schemaPartitionInfo =
partitionFetcher.getSchemaPartition(patternTree);
analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@@ -3147,6 +3139,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeTimeseriesRegionScan(
countTimeSeriesStatement.getTimeCondition(), patternTree,
analysis, context);
if (!hasSchema) {
+
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountTimeSeriesHeader());
return analysis;
}
} catch (IllegalPathException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 7269d1cdd09..99dba242ef8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -3540,7 +3540,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
ActiveDeviceRegionScanOperator regionScanOperator =
new ActiveDeviceRegionScanOperator(
- operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter);
+ operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter,
node.isOutputCount());
DataDriverContext dataDriverContext = (DataDriverContext)
context.getDriverContext();
dataDriverContext.addSourceOperator(regionScanOperator);
@@ -3573,7 +3573,11 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
ActiveTimeSeriesRegionScanOperator regionScanOperator =
new ActiveTimeSeriesRegionScanOperator(
- operatorContext, node.getPlanNodeId(), timeseriesToSchemaInfo,
filter);
+ operatorContext,
+ node.getPlanNodeId(),
+ timeseriesToSchemaInfo,
+ filter,
+ node.isOutputCount());
dataDriverContext.addSourceOperator(regionScanOperator);
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.TIME_SERIES_REGION_SCAN);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 8efc80f3fce..63d59ccaec7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -774,6 +774,7 @@ public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext>
RegionScanNode regionScanNode = (RegionScanNode)
node.clone();
regionScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
regionScanNode.setRegionReplicaSet(dataRegion);
+ regionScanNode.setOutputCount(node.isOutputCount());
regionScanNode.clearPath();
return regionScanNode;
})