This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 102364b99f6 Refactor SchemaScanOperator to do asynchronous fetch and 
return isBlock (#10231) (#10304)
102364b99f6 is described below

commit 102364b99f64f5361d825864b9bf7676888167d0
Author: Chen YZ <[email protected]>
AuthorDate: Mon Jun 26 13:24:57 2023 +0800

    Refactor SchemaScanOperator to do asynchronous fetch and return isBlock 
(#10231) (#10304)
---
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  16 +++
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  17 +++
 .../db/metadata/query/reader/ISchemaReader.java    |  44 ++++++-
 .../reader/SchemaReaderLimitOffsetWrapper.java     |  55 ++++++--
 .../reader/TimeseriesReaderWithViewFetch.java      | 144 +++++++++++++++------
 .../db/metadata/schemaregion/ISchemaRegion.java    |   4 -
 .../apache/iotdb/db/metadata/tag/TagManager.java   |   6 +
 .../schema/CountGroupByLevelScanOperator.java      | 108 +++++++++++-----
 .../operator/schema/SchemaCountOperator.java       |  75 ++++++++---
 .../operator/schema/SchemaQueryScanOperator.java   |  89 ++++++++++---
 .../schema/source/LogicalViewSchemaSource.java     |  22 ++--
 .../schema/source/PathsUsingTemplateSource.java    |  11 ++
 .../operator/schema/SchemaOperatorTestUtil.java    |  12 +-
 13 files changed, 460 insertions(+), 143 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 6d5d9cdd841..d567b6c8a3e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -1048,6 +1050,10 @@ public class MTreeBelowSGCachedImpl {
             collector.close();
           }
 
+          public ListenableFuture<?> isBlocked() {
+            return NOT_BLOCKED;
+          }
+
           public boolean hasNext() {
             while (next == null && collector.hasNext()) {
               IDeviceSchemaInfo temp = collector.next();
@@ -1059,6 +1065,9 @@ public class MTreeBelowSGCachedImpl {
           }
 
           public IDeviceSchemaInfo next() {
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
             IDeviceSchemaInfo result = next;
             next = null;
             return result;
@@ -1170,11 +1179,18 @@ public class MTreeBelowSGCachedImpl {
         collector.close();
       }
 
+      public ListenableFuture<?> isBlocked() {
+        return NOT_BLOCKED;
+      }
+
       public boolean hasNext() {
         return collector.hasNext();
       }
 
       public INodeSchemaInfo next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
         return collector.next();
       }
     };
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index dedebbdbfb8..7bfefc9b189 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -77,6 +77,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -86,6 +88,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -908,6 +911,10 @@ public class MTreeBelowSGMemoryImpl {
             collector.close();
           }
 
+          public ListenableFuture<?> isBlocked() {
+            return NOT_BLOCKED;
+          }
+
           public boolean hasNext() {
             while (next == null && collector.hasNext()) {
               IDeviceSchemaInfo temp = collector.next();
@@ -919,6 +926,9 @@ public class MTreeBelowSGMemoryImpl {
           }
 
           public IDeviceSchemaInfo next() {
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
             IDeviceSchemaInfo result = next;
             next = null;
             return result;
@@ -1031,11 +1041,18 @@ public class MTreeBelowSGMemoryImpl {
         collector.close();
       }
 
+      public ListenableFuture<?> isBlocked() {
+        return NOT_BLOCKED;
+      }
+
       public boolean hasNext() {
         return collector.hasNext();
       }
 
       public INodeSchemaInfo next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
         return collector.next();
       }
     };
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
index 84e5a409875..5f6fc9879f0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
@@ -21,9 +21,32 @@ package org.apache.iotdb.db.metadata.query.reader;
 
 import org.apache.iotdb.db.metadata.query.info.ISchemaInfo;
 
-import java.util.Iterator;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+/**
+ * ISchemaReader is a non-blocking iterator.
+ *
+ * <ol>
+ *   <li>The isBlock interface is used to determine if it is blocking. If 
isDone() is false, it is
+ *       blocking.
+ *   <li>The hasNext interface is responsible for determining whether the next 
result is available,
+ *       and if the current iterator is still in the isBlock state, it will 
synchronously wait for
+ *       the isBlock state to be lifted.
+ *   <li>The next interface is responsible for consuming the next result, if 
the current iterator
+ *       hasNext returns false, throw NoSuchElementException.
+ * </ol>
+ *
+ * @param <T>
+ */
+public interface ISchemaReader<T extends ISchemaInfo> extends AutoCloseable {
+
+  ListenableFuture<Boolean> NOT_BLOCKED_TRUE = immediateFuture(true);
+  ListenableFuture<Boolean> NOT_BLOCKED_FALSE = immediateFuture(false);
+  ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
 
-public interface ISchemaReader<T extends ISchemaInfo> extends Iterator<T>, 
AutoCloseable {
   /**
    * Determines if the iteration is successful when it completes.
    *
@@ -37,4 +60,21 @@ public interface ISchemaReader<T extends ISchemaInfo> 
extends Iterator<T>, AutoC
    * @return Throwable, null if no exception.
    */
   Throwable getFailure();
+
+  /**
+   * Returns a future that will be completed when the schemaReader becomes 
unblocked. It may be
+   * called several times before next and will return the same value.
+   *
+   * @return isDone is false if is Blocked.
+   */
+  ListenableFuture<?> isBlocked();
+
+  boolean hasNext();
+
+  /**
+   * The next interface is responsible for consuming the next result.
+   *
+   * @throws java.util.NoSuchElementException if the current iterator hasNext 
is false
+   */
+  T next();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
index 4c3ed2cb89b..2f05a032b42 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
@@ -21,8 +21,13 @@ package org.apache.iotdb.db.metadata.query.reader;
 
 import org.apache.iotdb.db.metadata.query.info.ISchemaInfo;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.NoSuchElementException;
 
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
 public class SchemaReaderLimitOffsetWrapper<T extends ISchemaInfo> implements 
ISchemaReader<T> {
 
   private final ISchemaReader<T> schemaReader;
@@ -32,20 +37,14 @@ public class SchemaReaderLimitOffsetWrapper<T extends 
ISchemaInfo> implements IS
   private final boolean hasLimit;
 
   private int count = 0;
-  int curOffset = 0;
+  private int curOffset = 0;
+  private ListenableFuture<?> isBlocked = null;
 
   public SchemaReaderLimitOffsetWrapper(ISchemaReader<T> schemaReader, long 
limit, long offset) {
     this.schemaReader = schemaReader;
     this.limit = limit;
     this.offset = offset;
     this.hasLimit = limit > 0 || offset > 0;
-
-    if (hasLimit) {
-      while (curOffset < offset && schemaReader.hasNext()) {
-        schemaReader.next();
-        curOffset++;
-      }
-    }
   }
 
   @Override
@@ -64,11 +63,45 @@ public class SchemaReaderLimitOffsetWrapper<T extends 
ISchemaInfo> implements IS
   }
 
   @Override
-  public boolean hasNext() {
+  public ListenableFuture<?> isBlocked() {
+    if (isBlocked != null) {
+      return isBlocked;
+    }
+    isBlocked = tryGetNext();
+    return isBlocked;
+  }
+
+  private ListenableFuture<?> tryGetNext() {
     if (hasLimit) {
-      return count < limit && schemaReader.hasNext();
+      if (curOffset < offset) {
+        // first time
+        return Futures.submit(
+            () -> {
+              while (curOffset < offset && schemaReader.hasNext()) {
+                schemaReader.next();
+                curOffset++;
+              }
+              return schemaReader.hasNext();
+            },
+            directExecutor());
+      }
+      if (count >= limit) {
+        return NOT_BLOCKED;
+      } else {
+        return schemaReader.isBlocked();
+      }
     } else {
-      return schemaReader.hasNext();
+      return schemaReader.isBlocked();
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      isBlocked().get();
+      return schemaReader.hasNext() && count < limit;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
index c93d4daf74d..557c84130a2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
@@ -30,12 +30,18 @@ import 
org.apache.iotdb.db.metadata.view.viewExpression.visitor.TransformToExpre
 import org.apache.iotdb.db.metadata.visitor.TimeseriesFilterVisitor;
 import org.apache.iotdb.db.mpp.common.NodeRef;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import 
org.apache.iotdb.db.mpp.plan.expression.visitor.CompleteMeasurementSchemaVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,13 +51,20 @@ import java.util.NoSuchElementException;
 import java.util.Queue;
 
 public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesSchemaInfo> {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeseriesReaderWithViewFetch.class);
   private final Traverser<ITimeSeriesSchemaInfo, ?> traverser;
   private final Queue<ITimeSeriesSchemaInfo> cachedViewList = new 
ArrayDeque<>();
   private ITimeSeriesSchemaInfo next = null;
   private boolean consumeView = false;
   private final SchemaFilter schemaFilter;
 
+  /**
+   * If isBlocked is null, it means the next is not fetched yet. If 
isBlocked.isDone() is false, it
+   * means the next is being fetched. If isBlocked.get() is true, it means 
hasNext, otherwise, it
+   * means no more info to be fetched.
+   */
+  private ListenableFuture<Boolean> isBlocked = null;
+
   private static final int BATCH_CACHED_SIZE = 1000;
   private static final TimeseriesFilterVisitor FILTER_VISITOR = new 
TimeseriesFilterVisitor();
 
@@ -61,29 +74,83 @@ public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesS
     this.schemaFilter = schemaFilter;
   }
 
+  @Override
   public boolean isSuccess() {
     return traverser.isSuccess();
   }
 
+  @Override
   public Throwable getFailure() {
     return traverser.getFailure();
   }
 
+  @Override
   public void close() {
     traverser.close();
   }
 
-  public boolean hasNext() {
-    if (next == null && !consumeView) {
-      fetchAndCacheNextResult();
+  /**
+   * Fetch ITimeSeriesSchemaInfo from the traverser and return only in the 
following three cases
+   *
+   * <ol>
+   *   <li>successfully fetched an info of normal time series. consumeView is 
false and next is not
+   *       null.
+   *   <li>successfully fetched batch info of view time series. consumeView is 
true and next is
+   *       null.
+   *   <li>no more info to be fetched. consumeView is false and next is null.
+   * </ol>
+   */
+  @Override
+  public ListenableFuture<Boolean> isBlocked() {
+    if (isBlocked != null) {
+      return isBlocked;
     }
+    ListenableFuture<Boolean> res = NOT_BLOCKED_FALSE;
     if (consumeView) {
-      return !cachedViewList.isEmpty();
+      // consume view list
+      res = NOT_BLOCKED_TRUE;
+    } else if (next == null) {
+      // get next from traverser
+      ITimeSeriesSchemaInfo temp;
+      while (traverser.hasNext()) {
+        temp = traverser.next();
+        if (temp.isLogicalView()) {
+          // view timeseries
+          cachedViewList.add(temp.snapshot());
+          if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+            res = asyncGetNext();
+            break;
+          }
+        } else if (FILTER_VISITOR.process(schemaFilter, temp)) {
+          // normal timeseries
+          next = temp;
+          res = NOT_BLOCKED_TRUE;
+          break;
+        }
+      }
+      if (res == NOT_BLOCKED_FALSE && !cachedViewList.isEmpty()) {
+        // all schema info has been fetched, but there mau be still some view 
schema info in
+        // cachedViewList
+        res = asyncGetNext();
+      }
     } else {
-      return next != null;
+      // next is not null
+      res = NOT_BLOCKED_TRUE;
     }
+    isBlocked = res;
+    return res;
   }
 
+  @Override
+  public boolean hasNext() {
+    try {
+      return isBlocked().get();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public ITimeSeriesSchemaInfo next() {
     if (!hasNext()) {
       throw new NoSuchElementException();
@@ -93,47 +160,43 @@ public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesS
       result = next;
       next = null;
     } else {
+      // it may return null if cachedViewList is empty but consumeView is true
       result = cachedViewList.poll();
       consumeView = !cachedViewList.isEmpty();
     }
+    isBlocked = null;
     return result;
   }
 
-  /**
-   * Fetch ITimeSeriesSchemaInfo from the traverser and return only in the 
following three cases
-   *
-   * <ol>
-   *   <li>successfully fetched an info of normal time series. consumeView is 
false and next is not
-   *       null.
-   *   <li>successfully fetched batch info of view time series. consumeView is 
true and next is
-   *       null.
-   *   <li>no more info to be fetched. consumeView is false and next is null.
-   * </ol>
-   */
-  private void fetchAndCacheNextResult() {
-    ITimeSeriesSchemaInfo temp;
-    while (traverser.hasNext()) {
-      temp = traverser.next();
-      if (temp.isLogicalView()) {
-        cachedViewList.add(temp.snapshot());
-        if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+  private ListenableFuture<Boolean> asyncGetNext() {
+    // enter this function only when viewList is full or all schema info has 
been fetched and
+    // viewList is not empty
+    return Futures.submit(
+        () -> {
           fetchViewTimeSeriesSchemaInfo();
           if (consumeView) {
-            break;
+            return true;
+          } else {
+            // all cache view is no satisfied
+            while (traverser.hasNext()) {
+              ITimeSeriesSchemaInfo temp = traverser.next();
+              if (temp.isLogicalView()) {
+                cachedViewList.add(temp.snapshot());
+                if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+                  fetchViewTimeSeriesSchemaInfo();
+                  if (consumeView) {
+                    return true;
+                  }
+                }
+              } else if (FILTER_VISITOR.process(schemaFilter, temp)) {
+                next = temp;
+                return true;
+              }
+            }
+            return false;
           }
-        }
-      } else {
-        if (FILTER_VISITOR.process(schemaFilter, temp)) {
-          next = temp;
-          break;
-        }
-      }
-    }
-    if (next == null && !cachedViewList.isEmpty()) {
-      // all schema info has been fetched, but there mau be still some view 
schema info in
-      // cachedViewList
-      fetchViewTimeSeriesSchemaInfo();
-    }
+        },
+        FragmentInstanceManager.getInstance().getIntoOperationExecutor());
   }
 
   private void fetchViewTimeSeriesSchemaInfo() {
@@ -154,6 +217,7 @@ public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesS
     }
     // clear cachedViewList, all cached view will be added in the last step
     cachedViewList.clear();
+
     ISchemaTree schemaTree = 
ClusterSchemaFetcher.getInstance().fetchSchema(patternTree, null);
     // process each view expression and get data type
     TransformToExpressionVisitor transformToExpressionVisitor = new 
TransformToExpressionVisitor();
@@ -179,8 +243,6 @@ public class TimeseriesReaderWithViewFetch implements 
ISchemaReader<ITimeSeriesS
         cachedViewList.add(delayedLogicalViewList.get(i));
       }
     }
-    if (!cachedViewList.isEmpty()) {
-      consumeView = true;
-    }
+    consumeView = !cachedViewList.isEmpty();
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 3c368e7d124..6af5159939b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -292,10 +292,6 @@ public interface ISchemaRegion {
   ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan 
showDevicesPlan)
       throws MetadataException;
 
-  /**
-   * The iterated result shall be consumed before calling reader.hasNext() or 
reader.next(). Its
-   * implementation is based on the reader's process context.
-   */
   ISchemaReader<ITimeSeriesSchemaInfo> getTimeSeriesReader(IShowTimeSeriesPlan 
showTimeSeriesPlan)
       throws MetadataException;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index 9a2ae5ccfa8..d58ded19174 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -249,6 +250,11 @@ public class TagManager {
         // do nothing
       }
 
+      @Override
+      public ListenableFuture<?> isBlocked() {
+        return NOT_BLOCKED;
+      }
+
       @Override
       public boolean hasNext() {
         if (throwable == null) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
index 49ceb468ba1..a57026e28f7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -41,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements 
SourceOperator {
@@ -52,12 +54,13 @@ public class CountGroupByLevelScanOperator<T extends 
ISchemaInfo> implements Sou
 
   private final PlanNodeId sourceId;
   private final OperatorContext operatorContext;
-
   private final int level;
-
   private final ISchemaSource<T> schemaSource;
+  private final Map<PartialPath, Long> countMap;
 
   private ISchemaReader<T> schemaReader;
+  private ListenableFuture<?> isBlocked;
+  private TsBlock next;
 
   public CountGroupByLevelScanOperator(
       PlanNodeId sourceId,
@@ -68,6 +71,7 @@ public class CountGroupByLevelScanOperator<T extends 
ISchemaInfo> implements Sou
     this.operatorContext = operatorContext;
     this.level = level;
     this.schemaSource = schemaSource;
+    this.countMap = new HashMap<>();
   }
 
   @Override
@@ -80,20 +84,81 @@ public class CountGroupByLevelScanOperator<T extends 
ISchemaInfo> implements Sou
     return operatorContext;
   }
 
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    if (isBlocked == null) {
+      isBlocked = tryGetNext();
+    }
+    return isBlocked;
+  }
+
+  /**
+   * Try to get next TsBlock. If the next is not ready, return a future. After 
success, {@link
+   * CountGroupByLevelScanOperator#next} will be set.
+   */
+  private ListenableFuture<?> tryGetNext() {
+    if (schemaReader == null) {
+      schemaReader = createTimeSeriesReader();
+    }
+    while (true) {
+      try {
+        ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+        if (!readerBlocked.isDone()) {
+          readerBlocked.addListener(
+              () -> next = constructTsBlockAndClearMap(countMap), 
directExecutor());
+          return readerBlocked;
+        } else if (schemaReader.hasNext()) {
+          ISchemaInfo schemaInfo = schemaReader.next();
+          PartialPath path = schemaInfo.getPartialPath();
+          if (path.getNodeLength() < level) {
+            continue;
+          }
+          PartialPath levelPath = new 
PartialPath(Arrays.copyOf(path.getNodes(), level + 1));
+          countMap.compute(
+              levelPath,
+              (k, v) -> {
+                if (v == null) {
+                  return 1L;
+                } else {
+                  return v + 1;
+                }
+              });
+          if (countMap.size() == DEFAULT_BATCH_SIZE) {
+            next = constructTsBlockAndClearMap(countMap);
+            return NOT_BLOCKED;
+          }
+        } else {
+          if (countMap.isEmpty()) {
+            next = null;
+          } else {
+            next = constructTsBlockAndClearMap(countMap);
+          }
+          return NOT_BLOCKED;
+        }
+      } catch (Exception e) {
+        throw new SchemaExecutionException(e);
+      }
+    }
+  }
+
   @Override
   public TsBlock next() throws Exception {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
-    return generateResult();
+    TsBlock ret = next;
+    next = null;
+    isBlocked = null;
+    return ret;
   }
 
   @Override
   public boolean hasNext() throws Exception {
-    if (schemaReader == null) {
-      schemaReader = createTimeSeriesReader();
+    isBlocked().get(); // wait for the next TsBlock
+    if (!schemaReader.isSuccess()) {
+      throw new SchemaExecutionException(schemaReader.getFailure());
     }
-    return schemaReader.hasNext();
+    return next != null;
   }
 
   public ISchemaReader<T> createTimeSeriesReader() {
@@ -101,35 +166,7 @@ public class CountGroupByLevelScanOperator<T extends 
ISchemaInfo> implements Sou
         ((SchemaDriverContext) 
operatorContext.getDriverContext()).getSchemaRegion());
   }
 
-  private TsBlock generateResult() {
-    Map<PartialPath, Long> countMap = new HashMap<>();
-    T schemaInfo;
-    PartialPath path;
-    PartialPath levelPath;
-    while (schemaReader.hasNext()) {
-      schemaInfo = schemaReader.next();
-      path = schemaInfo.getPartialPath();
-      if (path.getNodeLength() < level) {
-        continue;
-      }
-      levelPath = new PartialPath(Arrays.copyOf(path.getNodes(), level + 1));
-      countMap.compute(
-          levelPath,
-          (k, v) -> {
-            if (v == null) {
-              return 1L;
-            } else {
-              return v + 1;
-            }
-          });
-      if (countMap.size() == DEFAULT_BATCH_SIZE) {
-        break;
-      }
-    }
-    if (!schemaReader.isSuccess()) {
-      throw new SchemaExecutionException(schemaReader.getFailure());
-    }
-
+  private TsBlock constructTsBlockAndClearMap(Map<PartialPath, Long> countMap) 
{
     TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
     for (Map.Entry<PartialPath, Long> entry : countMap.entrySet()) {
       tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
@@ -137,6 +174,7 @@ public class CountGroupByLevelScanOperator<T extends 
ISchemaInfo> implements Sou
       tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
       tsBlockBuilder.declarePosition();
     }
+    countMap.clear();
     return tsBlockBuilder.build();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
index 60e82bf6645..0f8444bc97f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -47,7 +49,10 @@ public class SchemaCountOperator<T extends ISchemaInfo> 
implements SourceOperato
   private final ISchemaSource<T> schemaSource;
 
   private ISchemaReader<T> schemaReader;
+  private int count;
   private boolean isFinished;
+  private ListenableFuture<?> isBlocked;
+  private TsBlock next; // next will be set only when done
 
   public SchemaCountOperator(
       PlanNodeId sourceId, OperatorContext operatorContext, ISchemaSource<T> 
schemaSource) {
@@ -56,7 +61,7 @@ public class SchemaCountOperator<T extends ISchemaInfo> 
implements SourceOperato
     this.schemaSource = schemaSource;
   }
 
-  private final ISchemaRegion getSchemaRegion() {
+  private ISchemaRegion getSchemaRegion() {
     return ((SchemaDriverContext) 
operatorContext.getDriverContext()).getSchemaRegion();
   }
 
@@ -70,40 +75,74 @@ public class SchemaCountOperator<T extends ISchemaInfo> 
implements SourceOperato
   }
 
   @Override
-  public TsBlock next() throws Exception {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
+  public ListenableFuture<?> isBlocked() {
+    if (isBlocked == null) {
+      isBlocked = tryGetNext();
     }
-    isFinished = true;
-    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
-    long count = 0;
+    return isBlocked;
+  }
+
+  /**
+   * Try to get next TsBlock. If the next is not ready, return a future. After 
success, {@link
+   * SchemaCountOperator#next} will be set.
+   */
+  private ListenableFuture<?> tryGetNext() {
     ISchemaRegion schemaRegion = getSchemaRegion();
     if (schemaSource.hasSchemaStatistic(schemaRegion)) {
-      count = schemaSource.getSchemaStatistic(schemaRegion);
+      next = constructTsBlock(schemaSource.getSchemaStatistic(schemaRegion));
+      return NOT_BLOCKED;
     } else {
       if (schemaReader == null) {
         schemaReader = createSchemaReader();
       }
-      while (schemaReader.hasNext()) {
-        schemaReader.next();
-        count++;
-      }
-      if (!schemaReader.isSuccess()) {
-        throw new SchemaExecutionException(schemaReader.getFailure());
+      while (true) {
+        try {
+          ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+          if (!readerBlocked.isDone()) {
+            return readerBlocked;
+          } else if (schemaReader.hasNext()) {
+            schemaReader.next();
+            count++;
+          } else {
+            next = constructTsBlock(count);
+            return NOT_BLOCKED;
+          }
+        } catch (Exception e) {
+          throw new SchemaExecutionException(e);
+        }
       }
     }
+  }
 
-    tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
-    tsBlockBuilder.getColumnBuilder(0).writeLong(count);
-    tsBlockBuilder.declarePosition();
-    return tsBlockBuilder.build();
+  @Override
+  public TsBlock next() throws Exception {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    if (next != null) {
+      isFinished = true;
+    }
+    isBlocked = null;
+    return next;
   }
 
   @Override
   public boolean hasNext() throws Exception {
+    isBlocked().get(); // wait for the next TsBlock
+    if (schemaReader != null && !schemaReader.isSuccess()) {
+      throw new SchemaExecutionException(schemaReader.getFailure());
+    }
     return !isFinished;
   }
 
+  private TsBlock constructTsBlock(long count) {
+    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
+    tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+    tsBlockBuilder.getColumnBuilder(0).writeLong(count);
+    tsBlockBuilder.declarePosition();
+    return tsBlockBuilder.build();
+  }
+
   @Override
   public boolean isFinished() throws Exception {
     return isFinished;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 966f48ac8b6..89cb64ce676 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -33,10 +33,13 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static 
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SchemaQueryScanOperator<T extends ISchemaInfo> implements 
SourceOperator {
@@ -60,6 +63,11 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> 
implements SourceOpe
 
   private ISchemaReader<T> schemaReader;
 
+  private final TsBlockBuilder tsBlockBuilder;
+  private ListenableFuture<?> isBlocked;
+  private TsBlock next;
+  private boolean isFinished;
+
   protected SchemaQueryScanOperator(
       PlanNodeId sourceId,
       OperatorContext operatorContext,
@@ -76,6 +84,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> 
implements SourceOpe
     this.sourceId = sourceId;
     this.outputDataTypes = outputDataTypes;
     this.schemaSource = null;
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
   public SchemaQueryScanOperator(
@@ -87,6 +96,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> 
implements SourceOpe
         schemaSource.getInfoQueryColumnHeaders().stream()
             .map(ColumnHeader::getColumnType)
             .collect(Collectors.toList());
+    this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
   protected ISchemaReader<T> createSchemaReader() {
@@ -94,7 +104,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo> 
implements SourceOpe
         ((SchemaDriverContext) 
operatorContext.getDriverContext()).getSchemaRegion());
   }
 
-  protected void setColumns(T element, TsBlockBuilder builder) {
+  private void setColumns(T element, TsBlockBuilder builder) {
     schemaSource.transformToTsBlockColumns(element, builder, getDatabase());
   }
 
@@ -128,36 +138,79 @@ public class SchemaQueryScanOperator<T extends 
ISchemaInfo> implements SourceOpe
   }
 
   @Override
-  public TsBlock next() throws Exception {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
+  public ListenableFuture<?> isBlocked() {
+    if (isBlocked == null) {
+      isBlocked = tryGetNext();
     }
-    TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-    T element;
-    while (schemaReader.hasNext()) {
-      element = schemaReader.next();
-      setColumns(element, tsBlockBuilder);
-      if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
-        break;
+    return isBlocked;
+  }
+
+  /**
+   * Try to get next TsBlock. If the next is not ready, return a future. After 
success, {@link
+   * SchemaQueryScanOperator#next} will be set.
+   */
+  private ListenableFuture<?> tryGetNext() {
+    if (schemaReader == null) {
+      schemaReader = createSchemaReader();
+    }
+    while (true) {
+      try {
+        ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+        if (!readerBlocked.isDone()) {
+          readerBlocked.addListener(
+              () -> {
+                next = tsBlockBuilder.build();
+                tsBlockBuilder.reset();
+              },
+              directExecutor());
+          return readerBlocked;
+        } else if (schemaReader.hasNext()) {
+          T element = schemaReader.next();
+          setColumns(element, tsBlockBuilder);
+          if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
+            next = tsBlockBuilder.build();
+            tsBlockBuilder.reset();
+            return NOT_BLOCKED;
+          }
+        } else {
+          if (tsBlockBuilder.isEmpty()) {
+            next = null;
+            isFinished = true;
+          } else {
+            next = tsBlockBuilder.build();
+          }
+          tsBlockBuilder.reset();
+          return NOT_BLOCKED;
+        }
+      } catch (Exception e) {
+        throw new SchemaExecutionException(e);
       }
     }
-    if (!schemaReader.isSuccess()) {
-      throw new SchemaExecutionException(schemaReader.getFailure());
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
     }
-    return tsBlockBuilder.build();
+    TsBlock ret = next;
+    next = null;
+    isBlocked = null;
+    return ret;
   }
 
   @Override
   public boolean hasNext() throws Exception {
-    if (schemaReader == null) {
-      schemaReader = createSchemaReader();
+    isBlocked().get(); // wait for the next TsBlock
+    if (!schemaReader.isSuccess()) {
+      throw new SchemaExecutionException(schemaReader.getFailure());
     }
-    return schemaReader.hasNext();
+    return next != null;
   }
 
   @Override
   public boolean isFinished() throws Exception {
-    return !hasNextWithTimer();
+    return isFinished;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
index e1143e317f5..acc818cc4f3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.schema.view.ViewType;
 import 
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import 
org.apache.iotdb.db.metadata.query.reader.SchemaReaderLimitOffsetWrapper;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
@@ -64,18 +63,15 @@ public class LogicalViewSchemaSource implements 
ISchemaSource<ITimeSeriesSchemaI
   @Override
   public ISchemaReader<ITimeSeriesSchemaInfo> getSchemaReader(ISchemaRegion 
schemaRegion) {
     try {
-      return new SchemaReaderLimitOffsetWrapper<>(
-          schemaRegion.getTimeSeriesReader(
-              SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
-                  pathPattern,
-                  Collections.emptyMap(),
-                  0,
-                  0,
-                  false,
-                  SchemaFilterFactory.and(
-                      schemaFilter, 
SchemaFilterFactory.createViewTypeFilter(ViewType.VIEW)))),
-          limit,
-          offset);
+      return schemaRegion.getTimeSeriesReader(
+          SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+              pathPattern,
+              Collections.emptyMap(),
+              limit,
+              offset,
+              false,
+              SchemaFilterFactory.and(
+                  schemaFilter, 
SchemaFilterFactory.createViewTypeFilter(ViewType.VIEW))));
     } catch (MetadataException e) {
       throw new SchemaExecutionException(e.getMessage(), e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
index 2ce78e50de1..82b7a02548d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
@@ -30,8 +30,11 @@ import 
org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 public class PathsUsingTemplateSource implements 
ISchemaSource<IDeviceSchemaInfo> {
 
@@ -96,6 +99,11 @@ public class PathsUsingTemplateSource implements 
ISchemaSource<IDeviceSchemaInfo
       }
     }
 
+    @Override
+    public ListenableFuture<?> isBlocked() {
+      return NOT_BLOCKED;
+    }
+
     @Override
     public boolean hasNext() {
       try {
@@ -133,6 +141,9 @@ public class PathsUsingTemplateSource implements 
ISchemaSource<IDeviceSchemaInfo
 
     @Override
     public IDeviceSchemaInfo next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
       return currentDeviceReader.next();
     }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
index 882076f6fca..18f75e1219a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
@@ -24,9 +24,11 @@ import 
org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.execution.operator.schema.source.ISchemaSource;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.mockito.Mockito;
 
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 public class SchemaOperatorTestUtil {
   public static final String EXCEPTION_MESSAGE = "ExceptionMessage";
@@ -50,15 +52,23 @@ public class SchemaOperatorTestUtil {
               }
 
               @Override
-              public void close() {}
+              public ListenableFuture<?> isBlocked() {
+                return NOT_BLOCKED;
+              }
 
               @Override
               public boolean hasNext() {
                 return iterator.hasNext();
               }
 
+              @Override
+              public void close() {}
+
               @Override
               public T next() {
+                if (!hasNext()) {
+                  throw new NoSuchElementException();
+                }
                 return iterator.next();
               }
             });


Reply via email to