This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c6e110e28f [IOTDB-6337] Refine the count calculation in RegionScan 
framework
4c6e110e28f is described below

commit 4c6e110e28fce314ba23d107fe45ae96b6862024
Author: YangCaiyin <[email protected]>
AuthorDate: Thu Jun 6 11:00:07 2024 +0800

    [IOTDB-6337] Refine the count calculation in RegionScan framework
---
 .../db/it/regionscan/IoTDBActiveRegionScanIT.java  | 23 ++++++++++++-
 .../process/ActiveRegionScanMergeOperator.java     | 37 +++++++++++++-------
 .../AbstractRegionScanDataSourceOperator.java      | 19 ++++++++--
 .../source/ActiveDeviceRegionScanOperator.java     | 40 ++++++++++++++--------
 .../source/ActiveTimeSeriesRegionScanOperator.java | 34 +++++++++++++++---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 23 +++++--------
 .../plan/planner/OperatorTreeGenerator.java        |  8 +++--
 .../plan/planner/distribution/SourceRewriter.java  |  1 +
 8 files changed, 133 insertions(+), 52 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
index bfbba1b0f6b..c608bf3e24c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/regionscan/IoTDBActiveRegionScanIT.java
@@ -285,6 +285,28 @@ public class IoTDBActiveRegionScanIT {
     basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
   }
 
+  @Test
+  public void showActiveDeviceEmptyTest() {
+    String sql = "show devices root.empty where time < 50";
+    String[] retArray = new String[] {};
+    basicShowActiveDeviceTest(sql, SHOW_DEVICES_COLUMN_NAMES, retArray);
+
+    sql = "count devices root.empty where time < 50";
+    long value = 0;
+    basicCountActiveDeviceTest(sql, COUNT_DEVICES_COLUMN_NAMES, value);
+  }
+
+  @Test
+  public void showActiveTimeseriesEmptyTest() {
+    String sql = "show timeseries root.empty where time < 50";
+    String[] retArray = new String[] {};
+    basicShowActiveDeviceTest(sql, SHOW_TIMESERIES_COLUMN_NAMES, retArray);
+
+    sql = "count timeseries root.empty where time < 50";
+    long value = 0;
+    basicCountActiveDeviceTest(sql, COUNT_TIMESERIES_COLUMN_NAMES, value);
+  }
+
   @Test
   public void showActiveTimeseriesTest() {
     String sql = "show timeseries where time = 4";
@@ -461,7 +483,6 @@ public class IoTDBActiveRegionScanIT {
 
       try (ResultSet resultSet = statement.executeQuery(sql)) {
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-        Map<String, Integer> map = new HashMap<>();
         assertEquals(1, resultSetMetaData.getColumnCount());
         assertEquals(columnName, resultSetMetaData.getColumnName(1));
         int cnt = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
index 8efeeecce42..1749b61b760 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ActiveRegionScanMergeOperator.java
@@ -117,26 +117,37 @@ public class ActiveRegionScanMergeOperator extends 
AbstractConsumeAllOperator {
       }
     }
 
-    TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
-    int curTsBlockRowIndex;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (inputTsBlocks[i] == null) {
-        continue;
+    if (!needMergeBeforeCount) {
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        if (inputTsBlocks[i] == null) {
+          continue;
+        }
+        for (int row = 0; row < maxRowCanBuild; row++) {
+          long childCount = 
inputTsBlocks[i].getValueColumns()[0].getLong(inputIndex[i] + row);
+          count += childCount;
+          inputIndex[i] += maxRowCanBuild;
+        }
       }
-      curTsBlockRowIndex = inputIndex[i];
-      for (int row = 0; row < maxRowCanBuild; row++) {
-        String id =
-            inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex 
+ row).toString();
-        if (!outputCount || needMergeBeforeCount) {
+    } else {
+      TimeColumnBuilder timeColumnBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+      int curTsBlockRowIndex;
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        if (inputTsBlocks[i] == null) {
+          continue;
+        }
+        curTsBlockRowIndex = inputIndex[i];
+        for (int row = 0; row < maxRowCanBuild; row++) {
+          String id =
+              
inputTsBlocks[i].getValueColumns()[0].getBinary(curTsBlockRowIndex + 
row).toString();
           if (deduplicatedSet.contains(id)) {
             continue;
           }
           deduplicatedSet.add(id);
+          buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder, 
valueColumnBuilders);
         }
-        buildOneRow(i, curTsBlockRowIndex + row, timeColumnBuilder, 
valueColumnBuilders);
+        inputIndex[i] += maxRowCanBuild;
       }
-      inputIndex[i] += maxRowCanBuild;
     }
     return outputCount ? returnResultIfNoMoreData() : tsBlockBuilder.build();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
index b9cd4d442d7..0f20259d5dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractRegionScanDataSourceOperator.java
@@ -37,6 +37,9 @@ public abstract class AbstractRegionScanDataSourceOperator 
extends AbstractSourc
 
   protected boolean finished = false;
 
+  protected boolean outputCount;
+  protected long count = 0;
+
   protected AbstractRegionScanForActiveDataUtil regionScanUtil;
   protected TsBlockBuilder resultTsBlockBuilder;
 
@@ -97,16 +100,28 @@ public abstract class AbstractRegionScanDataSourceOperator 
extends AbstractSourc
       } while (System.nanoTime() - start < maxRuntime && 
!resultTsBlockBuilder.isFull());
 
       finished =
-          resultTsBlockBuilder.isEmpty()
+          (resultTsBlockBuilder.isEmpty())
               && ((!regionScanUtil.hasMoreData() && 
regionScanUtil.isCurrentTsFileFinished())
                   || isAllDataChecked());
 
-      return !finished;
+      boolean hasCachedCountValue = buildCountResultIfNeed();
+      return !finished || hasCachedCountValue;
     } catch (IOException e) {
       throw new IOException("Error occurs when scanning active time series.", 
e);
     }
   }
 
+  private boolean buildCountResultIfNeed() {
+    if (!outputCount || !finished || count == -1) {
+      return false;
+    }
+    resultTsBlockBuilder.getTimeColumnBuilder().writeLong(-1);
+    resultTsBlockBuilder.getValueColumnBuilders()[0].writeLong(count);
+    resultTsBlockBuilder.declarePosition();
+    count = -1;
+    return true;
+  }
+
   @Override
   public void close() throws Exception {
     // do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
index b2bdb66ba1c..873352b8b74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveDeviceRegionScanOperator.java
@@ -51,7 +51,9 @@ public class ActiveDeviceRegionScanOperator extends 
AbstractRegionScanDataSource
       OperatorContext operatorContext,
       PlanNodeId sourceId,
       Map<IDeviceID, Boolean> deviceToAlignedMap,
-      Filter timeFilter) {
+      Filter timeFilter,
+      boolean outputCount) {
+    this.outputCount = outputCount;
     this.sourceId = sourceId;
     this.operatorContext = operatorContext;
     this.deviceToAlignedMap = deviceToAlignedMap;
@@ -70,26 +72,36 @@ public class ActiveDeviceRegionScanOperator extends 
AbstractRegionScanDataSource
 
   @Override
   protected void updateActiveData() {
-    TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
-
     List<IDeviceID> activeDevices =
         ((RegionScanForActiveDeviceUtil) regionScanUtil).getActiveDevices();
-    for (IDeviceID deviceID : activeDevices) {
-      timeColumnBuilder.writeLong(-1);
-      columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
-      columnBuilders[1].writeBinary(
-          new Binary(
-              String.valueOf(deviceToAlignedMap.get(deviceID)), 
TSFileConfig.STRING_CHARSET));
-      columnBuilders[2].appendNull();
-      columnBuilders[3].appendNull();
-      resultTsBlockBuilder.declarePosition();
-      deviceToAlignedMap.remove(deviceID);
+
+    if (this.outputCount) {
+      count += activeDevices.size();
+      activeDevices.forEach(deviceToAlignedMap.keySet()::remove);
+    } else {
+      TimeColumnBuilder timeColumnBuilder = 
resultTsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
+      for (IDeviceID deviceID : activeDevices) {
+        timeColumnBuilder.writeLong(-1);
+        columnBuilders[0].writeBinary(new Binary(deviceID.getBytes()));
+        columnBuilders[1].writeBinary(
+            new Binary(
+                String.valueOf(deviceToAlignedMap.get(deviceID)), 
TSFileConfig.STRING_CHARSET));
+        columnBuilders[2].appendNull();
+        columnBuilders[3].appendNull();
+        resultTsBlockBuilder.declarePosition();
+        deviceToAlignedMap.remove(deviceID);
+      }
     }
   }
 
   @Override
   protected List<TSDataType> getResultDataTypes() {
+    if (outputCount) {
+      return ColumnHeaderConstant.countDevicesColumnHeaders.stream()
+          .map(ColumnHeader::getColumnType)
+          .collect(Collectors.toList());
+    }
     return ColumnHeaderConstant.showDevicesColumnHeaders.stream()
         .map(ColumnHeader::getColumnType)
         .collect(Collectors.toList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
index 34e082b27c5..e3385f92ecd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/ActiveTimeSeriesRegionScanOperator.java
@@ -54,7 +54,9 @@ public class ActiveTimeSeriesRegionScanOperator extends 
AbstractRegionScanDataSo
       OperatorContext operatorContext,
       PlanNodeId sourceId,
       Map<IDeviceID, Map<String, TimeseriesSchemaInfo>> 
timeSeriesToSchemasInfo,
-      Filter timeFilter) {
+      Filter timeFilter,
+      boolean isOutputCount) {
+    this.outputCount = isOutputCount;
     this.operatorContext = operatorContext;
     this.sourceId = sourceId;
     this.timeSeriesToSchemasInfo = timeSeriesToSchemasInfo;
@@ -95,6 +97,16 @@ public class ActiveTimeSeriesRegionScanOperator extends 
AbstractRegionScanDataSo
 
     Map<IDeviceID, List<String>> activeTimeSeries =
         ((RegionScanForActiveTimeSeriesUtil) 
regionScanUtil).getActiveTimeSeries();
+
+    if (outputCount) {
+      for (Map.Entry<IDeviceID, List<String>> entry : 
activeTimeSeries.entrySet()) {
+        List<String> timeSeriesList = entry.getValue();
+        count += timeSeriesList.size();
+        removeTimeseriesListFromDevice(entry.getKey(), timeSeriesList);
+      }
+      return;
+    }
+
     for (Map.Entry<IDeviceID, List<String>> entry : 
activeTimeSeries.entrySet()) {
       IDeviceID deviceID = entry.getKey();
       String deviceStr = ((PlainDeviceID) deviceID).toStringID();
@@ -117,11 +129,18 @@ public class ActiveTimeSeriesRegionScanOperator extends 
AbstractRegionScanDataSo
         checkAndAppend(schemaInfo.getDeadbandParameters(), columnBuilders[9]); 
// DeadbandParameters
         columnBuilders[10].writeBinary(VIEW_TYPE); // ViewType
         resultTsBlockBuilder.declarePosition();
-        timeSeriesInfo.remove(timeSeries);
-      }
-      if (timeSeriesInfo.isEmpty()) {
-        timeSeriesToSchemasInfo.remove(deviceID);
       }
+      removeTimeseriesListFromDevice(deviceID, timeSeriesList);
+    }
+  }
+
+  private void removeTimeseriesListFromDevice(IDeviceID deviceID, List<String> 
timeSeriesList) {
+    Map<String, TimeseriesSchemaInfo> timeSeriesInfo = 
timeSeriesToSchemasInfo.get(deviceID);
+    for (String timeSeries : timeSeriesList) {
+      timeSeriesInfo.remove(timeSeries);
+    }
+    if (timeSeriesInfo.isEmpty()) {
+      timeSeriesToSchemasInfo.remove(deviceID);
     }
   }
 
@@ -131,6 +150,11 @@ public class ActiveTimeSeriesRegionScanOperator extends 
AbstractRegionScanDataSo
 
   @Override
   protected List<TSDataType> getResultDataTypes() {
+    if (outputCount) {
+      return ColumnHeaderConstant.countTimeSeriesColumnHeaders.stream()
+          .map(ColumnHeader::getColumnType)
+          .collect(Collectors.toList());
+    }
     return ColumnHeaderConstant.showTimeSeriesColumnHeaders.stream()
         .map(ColumnHeader::getColumnType)
         .collect(Collectors.toList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 064685bcb6b..038514ee02f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2928,6 +2928,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             analyzeTimeseriesRegionScan(
                 showTimeSeriesStatement.getTimeCondition(), patternTree, 
analysis, context);
         if (!hasSchema) {
+          
analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader());
           return analysis;
         }
       } catch (IllegalPathException e) {
@@ -3007,7 +3008,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     schemaTree.removeLogicalView();
   }
 
-  private boolean analyzeDeviceRegionScan(
+  private void analyzeDeviceRegionScan(
       WhereCondition timeCondition,
       PathPatternTree patternTree,
       Analysis analysis,
@@ -3019,7 +3020,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     ISchemaTree schemaTree = 
schemaFetcher.fetchSchemaInDeviceLevel(patternTree, context);
     if (schemaTree.isEmpty()) {
       analysis.setFinishQueryAfterAnalyze(true);
-      return false;
+      return;
     }
 
     // fetch Data partition
@@ -3037,7 +3038,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             schemaTree,
             context);
     analysis.setDataPartitionInfo(dataPartition);
-    return true;
   }
 
   @Override
@@ -3051,12 +3051,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         
showDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
 
     if (showDevicesStatement.hasTimeCondition()) {
-      boolean hasSchema =
-          analyzeDeviceRegionScan(
-              showDevicesStatement.getTimeCondition(), patternTree, analysis, 
context);
-      if (!hasSchema) {
-        return analysis;
-      }
+      analyzeDeviceRegionScan(
+          showDevicesStatement.getTimeCondition(), patternTree, analysis, 
context);
     } else {
       SchemaPartition schemaPartitionInfo = 
partitionFetcher.getSchemaPartition(patternTree);
       analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@@ -3117,12 +3113,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     patternTree.appendPathPattern(
         
countDevicesStatement.getPathPattern().concatNode(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
     if (countDevicesStatement.hasTimeCondition()) {
-      boolean hasSchema =
-          analyzeDeviceRegionScan(
-              countDevicesStatement.getTimeCondition(), patternTree, analysis, 
context);
-      if (!hasSchema) {
-        return analysis;
-      }
+      analyzeDeviceRegionScan(
+          countDevicesStatement.getTimeCondition(), patternTree, analysis, 
context);
     } else {
       SchemaPartition schemaPartitionInfo = 
partitionFetcher.getSchemaPartition(patternTree);
       analysis.setSchemaPartitionInfo(schemaPartitionInfo);
@@ -3147,6 +3139,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             analyzeTimeseriesRegionScan(
                 countTimeSeriesStatement.getTimeCondition(), patternTree, 
analysis, context);
         if (!hasSchema) {
+          
analysis.setRespDatasetHeader(DatasetHeaderFactory.getCountTimeSeriesHeader());
           return analysis;
         }
       } catch (IllegalPathException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 7269d1cdd09..99dba242ef8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -3540,7 +3540,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     }
     ActiveDeviceRegionScanOperator regionScanOperator =
         new ActiveDeviceRegionScanOperator(
-            operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter);
+            operatorContext, node.getPlanNodeId(), deviceIDToAligned, filter, 
node.isOutputCount());
 
     DataDriverContext dataDriverContext = (DataDriverContext) 
context.getDriverContext();
     dataDriverContext.addSourceOperator(regionScanOperator);
@@ -3573,7 +3573,11 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     }
     ActiveTimeSeriesRegionScanOperator regionScanOperator =
         new ActiveTimeSeriesRegionScanOperator(
-            operatorContext, node.getPlanNodeId(), timeseriesToSchemaInfo, 
filter);
+            operatorContext,
+            node.getPlanNodeId(),
+            timeseriesToSchemaInfo,
+            filter,
+            node.isOutputCount());
 
     dataDriverContext.addSourceOperator(regionScanOperator);
     
dataDriverContext.setQueryDataSourceType(QueryDataSourceType.TIME_SERIES_REGION_SCAN);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 8efc80f3fce..63d59ccaec7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -774,6 +774,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                   RegionScanNode regionScanNode = (RegionScanNode) 
node.clone();
                   
regionScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
                   regionScanNode.setRegionReplicaSet(dataRegion);
+                  regionScanNode.setOutputCount(node.isOutputCount());
                   regionScanNode.clearPath();
                   return regionScanNode;
                 })

Reply via email to