This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fdd72bcd67 [IOTDB-4544] Implement tsblock split for schema query 
(#7532)
fdd72bcd67 is described below

commit fdd72bcd677d9e8180ea423e8f518fbef5c0fca8
Author: Marcos_Zyk <[email protected]>
AuthorDate: Fri Oct 7 14:36:24 2022 +0800

    [IOTDB-4544] Implement tsblock split for schema query (#7532)
---
 .../operator/schema/CountMergeOperator.java        | 90 ++++++++++++----------
 .../operator/schema/DevicesSchemaScanOperator.java | 16 ++--
 .../schema/LevelTimeSeriesCountOperator.java       | 50 ++++++++----
 .../schema/PathsUsingTemplateScanOperator.java     | 14 ++--
 .../operator/schema/SchemaQueryScanOperator.java   | 31 ++++----
 .../operator/schema/SchemaTsBlockUtil.java         | 57 ++++++++++++++
 .../schema/TimeSeriesSchemaScanOperator.java       | 16 ++--
 .../operator/schema/CountMergeOperatorTest.java    |  2 +
 8 files changed, 179 insertions(+), 97 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
index 7524b7c455..9dd3d15581 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java
@@ -35,21 +35,25 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static com.google.common.util.concurrent.Futures.successfulAsList;
+import java.util.NoSuchElementException;
 
 public class CountMergeOperator implements ProcessOperator {
   private final PlanNodeId planNodeId;
   private final OperatorContext operatorContext;
-  private boolean isFinished;
+
+  private List<TsBlock> tsBlockList = new ArrayList<>();
+  private int currentIndex = 0;
 
   private final List<Operator> children;
 
+  private final boolean isGroupByLevel;
+
   public CountMergeOperator(
       PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> 
children) {
     this.planNodeId = planNodeId;
     this.operatorContext = operatorContext;
     this.children = children;
+    isGroupByLevel = children.get(0) instanceof LevelTimeSeriesCountOperator;
   }
 
   @Override
@@ -59,34 +63,46 @@ public class CountMergeOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (Operator child : children) {
-      ListenableFuture<?> blocked = child.isBlocked();
-      if (!blocked.isDone()) {
-        listenableFutures.add(blocked);
+      while (!child.isFinished()) {
+        ListenableFuture<?> blocked = child.isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+        if (child.hasNext()) {
+          TsBlock tsBlock = child.next();
+          if (null != tsBlock && !tsBlock.isEmpty()) {
+            tsBlockList.add(tsBlock);
+          }
+        }
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
+
+    generateResultTsBlockList();
+
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    isFinished = true;
-    if (children.get(0) instanceof LevelTimeSeriesCountOperator) {
-      return nextWithGroupByLevel();
+    if (!hasNext()) {
+      throw new NoSuchElementException();
     }
-    return nextWithoutGroupByLevel();
+    currentIndex++;
+    return tsBlockList.get(currentIndex - 1);
   }
 
-  private TsBlock nextWithoutGroupByLevel() {
-    List<TsBlock> tsBlocks = new ArrayList<>(children.size());
-    for (Operator child : children) {
-      if (child.hasNext()) {
-        tsBlocks.add(child.next());
-      }
+  private void generateResultTsBlockList() {
+    if (isGroupByLevel) {
+      generateResultWithGroupByLevel();
+    } else {
+      generateResultWithoutGroupByLevel();
     }
+  }
+
+  private void generateResultWithoutGroupByLevel() {
     int totalCount = 0;
-    for (TsBlock tsBlock : tsBlocks) {
+    for (TsBlock tsBlock : tsBlockList) {
       int count = tsBlock.getColumn(0).getInt(0);
       totalCount += count;
     }
@@ -94,44 +110,38 @@ public class CountMergeOperator implements ProcessOperator 
{
     tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
     tsBlockBuilder.getColumnBuilder(0).writeInt(totalCount);
     tsBlockBuilder.declarePosition();
-    return tsBlockBuilder.build();
+    this.tsBlockList = Collections.singletonList(tsBlockBuilder.build());
   }
 
-  private TsBlock nextWithGroupByLevel() {
-    List<TsBlock> tsBlocks = new ArrayList<>(children.size());
-    for (Operator child : children) {
-      if (child.hasNext()) {
-        tsBlocks.add(child.next());
-      }
-    }
-    TsBlockBuilder tsBlockBuilder =
-        new TsBlockBuilder(Arrays.asList(TSDataType.TEXT, TSDataType.INT32));
+  private void generateResultWithGroupByLevel() {
     Map<String, Integer> countMap = new HashMap<>();
-    for (TsBlock tsBlock : tsBlocks) {
+    for (TsBlock tsBlock : tsBlockList) {
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
         String columnName = tsBlock.getColumn(0).getBinary(i).getStringValue();
         int count = tsBlock.getColumn(1).getInt(i);
         countMap.put(columnName, countMap.getOrDefault(columnName, 0) + count);
       }
     }
-    countMap.forEach(
-        (column, count) -> {
-          tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
-          tsBlockBuilder.getColumnBuilder(0).writeBinary(new Binary(column));
-          tsBlockBuilder.getColumnBuilder(1).writeInt(count);
-          tsBlockBuilder.declarePosition();
-        });
-    return tsBlockBuilder.build();
+    this.tsBlockList =
+        SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+            countMap.entrySet().iterator(),
+            Arrays.asList(TSDataType.TEXT, TSDataType.INT32),
+            (entry, tsBlockBuilder) -> {
+              tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+              tsBlockBuilder.getColumnBuilder(0).writeBinary(new 
Binary(entry.getKey()));
+              tsBlockBuilder.getColumnBuilder(1).writeInt(entry.getValue());
+              tsBlockBuilder.declarePosition();
+            });
   }
 
   @Override
   public boolean hasNext() {
-    return !isFinished;
+    return currentIndex < tsBlockList.size();
   }
 
   @Override
   public boolean isFinished() {
-    return isFinished;
+    return !hasNext();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
index fc8a3993e5..9eacd6e8cb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesSchemaScanOperator.java
@@ -57,18 +57,18 @@ public class DevicesSchemaScanOperator extends 
SchemaQueryScanOperator {
   }
 
   @Override
-  protected TsBlock createTsBlock() {
-    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+  protected List<TsBlock> createTsBlockList() {
     try {
-      ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
-          .getSchemaRegion()
-          .getMatchedDevices(convertToPhysicalPlan())
-          .left
-          .forEach(device -> setColumns(device, builder));
+      List<ShowDevicesResult> schemaRegionResult =
+          ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
+              .getSchemaRegion()
+              .getMatchedDevices(convertToPhysicalPlan())
+              .left;
+      return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+          schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
     } catch (MetadataException e) {
       throw new RuntimeException(e.getMessage(), e);
     }
-    return builder.build();
   }
 
   // ToDo @xinzhongtianxia remove this temporary converter after mpp online
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
index aa5d865014..f1ce0e88e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java
@@ -29,11 +29,11 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
@@ -48,7 +48,8 @@ public class LevelTimeSeriesCountOperator implements 
SourceOperator {
   private final String value;
   private final boolean isContains;
 
-  private boolean isFinished;
+  private List<TsBlock> tsBlockList;
+  private int currentIndex = 0;
 
   private final List<TSDataType> outputDataTypes;
 
@@ -87,8 +88,23 @@ public class LevelTimeSeriesCountOperator implements 
SourceOperator {
 
   @Override
   public TsBlock next() {
-    isFinished = true;
-    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    currentIndex++;
+    return tsBlockList.get(currentIndex - 1);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (tsBlockList == null) {
+      createTsBlockList();
+    }
+
+    return currentIndex < tsBlockList.size();
+  }
+
+  public void createTsBlockList() {
     Map<PartialPath, Integer> countMap;
     try {
       if (key != null && value != null) {
@@ -107,24 +123,24 @@ public class LevelTimeSeriesCountOperator implements 
SourceOperator {
     } catch (MetadataException e) {
       throw new RuntimeException(e.getMessage(), e);
     }
-    countMap.forEach(
-        (path, count) -> {
-          tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
-          tsBlockBuilder.getColumnBuilder(0).writeBinary(new 
Binary(path.getFullPath()));
-          tsBlockBuilder.getColumnBuilder(1).writeInt(count);
-          tsBlockBuilder.declarePosition();
-        });
-    return tsBlockBuilder.build();
-  }
 
-  @Override
-  public boolean hasNext() {
-    return !isFinished;
+    tsBlockList =
+        SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+            countMap.entrySet().iterator(),
+            outputDataTypes,
+            (entry, tsBlockBuilder) -> {
+              tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+              tsBlockBuilder
+                  .getColumnBuilder(0)
+                  .writeBinary(new Binary(entry.getKey().getFullPath()));
+              tsBlockBuilder.getColumnBuilder(1).writeInt(entry.getValue());
+              tsBlockBuilder.declarePosition();
+            });
   }
 
   @Override
   public boolean isFinished() {
-    return isFinished;
+    return !hasNext();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
index b65dade6de..0c8a9a2b4e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/PathsUsingTemplateScanOperator.java
@@ -50,17 +50,17 @@ public class PathsUsingTemplateScanOperator extends 
SchemaQueryScanOperator {
   }
 
   @Override
-  protected TsBlock createTsBlock() {
-    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+  protected List<TsBlock> createTsBlockList() {
     try {
-      ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
-          .getSchemaRegion()
-          .getPathsUsingTemplate(templateId)
-          .forEach(path -> setColumns(path, builder));
+      List<String> schemaRegionResult =
+          ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
+              .getSchemaRegion()
+              .getPathsUsingTemplate(templateId);
+      return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+          schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
     } catch (MetadataException e) {
       throw new RuntimeException(e.getMessage(), e);
     }
-    return builder.build();
   }
 
   private void setColumns(String path, TsBlockBuilder builder) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 4493dc58a8..23bbcd4366 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -24,13 +24,16 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.SourceOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import java.util.List;
+import java.util.NoSuchElementException;
+
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public abstract class SchemaQueryScanOperator implements SourceOperator {
 
   protected OperatorContext operatorContext;
-  protected TsBlock tsBlock;
-  protected boolean isFinished = false;
+  protected List<TsBlock> tsBlockList;
+  protected int currentIndex = 0;
 
   protected int limit;
   protected int offset;
@@ -54,7 +57,7 @@ public abstract class SchemaQueryScanOperator implements 
SourceOperator {
     this.sourceId = sourceId;
   }
 
-  protected abstract TsBlock createTsBlock();
+  protected abstract List<TsBlock> createTsBlockList();
 
   public PartialPath getPartialPath() {
     return partialPath;
@@ -87,25 +90,19 @@ public abstract class SchemaQueryScanOperator implements 
SourceOperator {
 
   @Override
   public TsBlock next() {
-    isFinished = true;
-    TsBlock result = tsBlock;
-    tsBlock = null;
-    return result;
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    currentIndex++;
+    return tsBlockList.get(currentIndex - 1);
   }
 
   @Override
   public boolean hasNext() {
-    if (isFinished) {
-      return false;
-    }
-    if (tsBlock == null) {
-      tsBlock = createTsBlock();
-      if (tsBlock.getPositionCount() == 0) {
-        isFinished = true;
-        return false;
-      }
+    if (tsBlockList == null) {
+      tsBlockList = createTsBlockList();
     }
-    return true;
+    return currentIndex < tsBlockList.size();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
new file mode 100644
index 0000000000..52b11fca50
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaTsBlockUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.schema;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+public class SchemaTsBlockUtil {
+
+  private static final long MAX_SIZE = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
+
+  public static <T> List<TsBlock> transferSchemaResultToTsBlockList(
+      Iterator<T> schemaRegionResultIterator,
+      List<TSDataType> outputDataTypes,
+      BiConsumer<T, TsBlockBuilder> consumer) {
+    List<TsBlock> result = new ArrayList<>();
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    T schemaRegionResultElement;
+    while (schemaRegionResultIterator.hasNext()) {
+      schemaRegionResultElement = schemaRegionResultIterator.next();
+      consumer.accept(schemaRegionResultElement, tsBlockBuilder);
+      if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
+        result.add(tsBlockBuilder.build());
+        tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+      }
+    }
+    if (!tsBlockBuilder.isEmpty()) {
+      result.add(tsBlockBuilder.build());
+    }
+    return result;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
index ab8a49e1b8..7fbc42f88f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesSchemaScanOperator.java
@@ -89,18 +89,18 @@ public class TimeSeriesSchemaScanOperator extends 
SchemaQueryScanOperator {
   }
 
   @Override
-  protected TsBlock createTsBlock() {
-    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+  protected List<TsBlock> createTsBlockList() {
     try {
-      ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
-          .getSchemaRegion()
-          .showTimeseries(convertToPhysicalPlan(), 
operatorContext.getInstanceContext())
-          .left
-          .forEach(series -> setColumns(series, builder));
+      List<ShowTimeSeriesResult> schemaRegionResult =
+          ((SchemaDriverContext) 
operatorContext.getInstanceContext().getDriverContext())
+              .getSchemaRegion()
+              .showTimeseries(convertToPhysicalPlan(), 
operatorContext.getInstanceContext())
+              .left;
+      return SchemaTsBlockUtil.transferSchemaResultToTsBlockList(
+          schemaRegionResult.iterator(), outputDataTypes, this::setColumns);
     } catch (MetadataException e) {
       throw new RuntimeException(e.getMessage(), e);
     }
-    return builder.build();
   }
 
   // ToDo @xinzhongtianxia remove this temporary converter after mpp online
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
index d0e63e49bb..97838bd851 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperatorTest.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -187,6 +188,7 @@ public class CountMergeOperatorTest {
               fragmentInstanceContext.getOperatorContexts().get(0),
               Arrays.asList(timeSeriesCountOperator1, 
timeSeriesCountOperator2));
       TsBlock tsBlock = null;
+      Assert.assertTrue(countMergeOperator.isBlocked().isDone());
       while (countMergeOperator.hasNext()) {
         tsBlock = countMergeOperator.next();
         assertFalse(countMergeOperator.hasNext());

Reply via email to