This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 102364b99f6 Refactor SchemaScanOperator to do asynchronous fetch and
return isBlock (#10231) (#10304)
102364b99f6 is described below
commit 102364b99f64f5361d825864b9bf7676888167d0
Author: Chen YZ <[email protected]>
AuthorDate: Mon Jun 26 13:24:57 2023 +0800
Refactor SchemaScanOperator to do asynchronous fetch and return isBlock
(#10231) (#10304)
---
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 16 +++
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 17 +++
.../db/metadata/query/reader/ISchemaReader.java | 44 ++++++-
.../reader/SchemaReaderLimitOffsetWrapper.java | 55 ++++++--
.../reader/TimeseriesReaderWithViewFetch.java | 144 +++++++++++++++------
.../db/metadata/schemaregion/ISchemaRegion.java | 4 -
.../apache/iotdb/db/metadata/tag/TagManager.java | 6 +
.../schema/CountGroupByLevelScanOperator.java | 108 +++++++++++-----
.../operator/schema/SchemaCountOperator.java | 75 ++++++++---
.../operator/schema/SchemaQueryScanOperator.java | 89 ++++++++++---
.../schema/source/LogicalViewSchemaSource.java | 22 ++--
.../schema/source/PathsUsingTemplateSource.java | 11 ++
.../operator/schema/SchemaOperatorTestUtil.java | 12 +-
13 files changed, 460 insertions(+), 143 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 6d5d9cdd841..d567b6c8a3e 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +83,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -1048,6 +1050,10 @@ public class MTreeBelowSGCachedImpl {
collector.close();
}
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
public boolean hasNext() {
while (next == null && collector.hasNext()) {
IDeviceSchemaInfo temp = collector.next();
@@ -1059,6 +1065,9 @@ public class MTreeBelowSGCachedImpl {
}
public IDeviceSchemaInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
IDeviceSchemaInfo result = next;
next = null;
return result;
@@ -1170,11 +1179,18 @@ public class MTreeBelowSGCachedImpl {
collector.close();
}
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
public boolean hasNext() {
return collector.hasNext();
}
public INodeSchemaInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
return collector.next();
}
};
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index dedebbdbfb8..7bfefc9b189 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -77,6 +77,8 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -86,6 +88,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -908,6 +911,10 @@ public class MTreeBelowSGMemoryImpl {
collector.close();
}
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
public boolean hasNext() {
while (next == null && collector.hasNext()) {
IDeviceSchemaInfo temp = collector.next();
@@ -919,6 +926,9 @@ public class MTreeBelowSGMemoryImpl {
}
public IDeviceSchemaInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
IDeviceSchemaInfo result = next;
next = null;
return result;
@@ -1031,11 +1041,18 @@ public class MTreeBelowSGMemoryImpl {
collector.close();
}
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
public boolean hasNext() {
return collector.hasNext();
}
public INodeSchemaInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
return collector.next();
}
};
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
index 84e5a409875..5f6fc9879f0 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/ISchemaReader.java
@@ -21,9 +21,32 @@ package org.apache.iotdb.db.metadata.query.reader;
import org.apache.iotdb.db.metadata.query.info.ISchemaInfo;
-import java.util.Iterator;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+/**
+ * ISchemaReader is a non-blocking iterator.
+ *
+ * <ol>
+ * <li>The isBlock interface is used to determine if it is blocking. If
isDone() is false, it is
+ * blocking.
+ * <li>The hasNext interface is responsible for determining whether the next
result is available,
+ * and if the current iterator is still in the isBlock state, it will
synchronously wait for
+ * the isBlock state to be lifted.
+ * <li>The next interface is responsible for consuming the next result, if
the current iterator
+ * hasNext returns false, throw NoSuchElementException.
+ * </ol>
+ *
+ * @param <T>
+ */
+public interface ISchemaReader<T extends ISchemaInfo> extends AutoCloseable {
+
+ ListenableFuture<Boolean> NOT_BLOCKED_TRUE = immediateFuture(true);
+ ListenableFuture<Boolean> NOT_BLOCKED_FALSE = immediateFuture(false);
+ ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
-public interface ISchemaReader<T extends ISchemaInfo> extends Iterator<T>,
AutoCloseable {
/**
* Determines if the iteration is successful when it completes.
*
@@ -37,4 +60,21 @@ public interface ISchemaReader<T extends ISchemaInfo>
extends Iterator<T>, AutoC
* @return Throwable, null if no exception.
*/
Throwable getFailure();
+
+ /**
+ * Returns a future that will be completed when the schemaReader becomes
unblocked. It may be
+ * called several times before next and will return the same value.
+ *
+ * @return isDone is false if is Blocked.
+ */
+ ListenableFuture<?> isBlocked();
+
+ boolean hasNext();
+
+ /**
+ * The next interface is responsible for consuming the next result.
+ *
+ * @throws java.util.NoSuchElementException if the current iterator hasNext
is false
+ */
+ T next();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
index 4c3ed2cb89b..2f05a032b42 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/SchemaReaderLimitOffsetWrapper.java
@@ -21,8 +21,13 @@ package org.apache.iotdb.db.metadata.query.reader;
import org.apache.iotdb.db.metadata.query.info.ISchemaInfo;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.NoSuchElementException;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
public class SchemaReaderLimitOffsetWrapper<T extends ISchemaInfo> implements
ISchemaReader<T> {
private final ISchemaReader<T> schemaReader;
@@ -32,20 +37,14 @@ public class SchemaReaderLimitOffsetWrapper<T extends
ISchemaInfo> implements IS
private final boolean hasLimit;
private int count = 0;
- int curOffset = 0;
+ private int curOffset = 0;
+ private ListenableFuture<?> isBlocked = null;
public SchemaReaderLimitOffsetWrapper(ISchemaReader<T> schemaReader, long
limit, long offset) {
this.schemaReader = schemaReader;
this.limit = limit;
this.offset = offset;
this.hasLimit = limit > 0 || offset > 0;
-
- if (hasLimit) {
- while (curOffset < offset && schemaReader.hasNext()) {
- schemaReader.next();
- curOffset++;
- }
- }
}
@Override
@@ -64,11 +63,45 @@ public class SchemaReaderLimitOffsetWrapper<T extends
ISchemaInfo> implements IS
}
@Override
- public boolean hasNext() {
+ public ListenableFuture<?> isBlocked() {
+ if (isBlocked != null) {
+ return isBlocked;
+ }
+ isBlocked = tryGetNext();
+ return isBlocked;
+ }
+
+ private ListenableFuture<?> tryGetNext() {
if (hasLimit) {
- return count < limit && schemaReader.hasNext();
+ if (curOffset < offset) {
+ // first time
+ return Futures.submit(
+ () -> {
+ while (curOffset < offset && schemaReader.hasNext()) {
+ schemaReader.next();
+ curOffset++;
+ }
+ return schemaReader.hasNext();
+ },
+ directExecutor());
+ }
+ if (count >= limit) {
+ return NOT_BLOCKED;
+ } else {
+ return schemaReader.isBlocked();
+ }
} else {
- return schemaReader.hasNext();
+ return schemaReader.isBlocked();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ isBlocked().get();
+ return schemaReader.hasNext() && count < limit;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
index c93d4daf74d..557c84130a2 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/query/reader/TimeseriesReaderWithViewFetch.java
@@ -30,12 +30,18 @@ import
org.apache.iotdb.db.metadata.view.viewExpression.visitor.TransformToExpre
import org.apache.iotdb.db.metadata.visitor.TimeseriesFilterVisitor;
import org.apache.iotdb.db.mpp.common.NodeRef;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import
org.apache.iotdb.db.mpp.plan.expression.visitor.CompleteMeasurementSchemaVisitor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
@@ -45,13 +51,20 @@ import java.util.NoSuchElementException;
import java.util.Queue;
public class TimeseriesReaderWithViewFetch implements
ISchemaReader<ITimeSeriesSchemaInfo> {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TimeseriesReaderWithViewFetch.class);
private final Traverser<ITimeSeriesSchemaInfo, ?> traverser;
private final Queue<ITimeSeriesSchemaInfo> cachedViewList = new
ArrayDeque<>();
private ITimeSeriesSchemaInfo next = null;
private boolean consumeView = false;
private final SchemaFilter schemaFilter;
+ /**
+ * If isBlocked is null, it means the next is not fetched yet. If
isBlocked.isDone() is false, it
+ * means the next is being fetched. If isBlocked.get() is true, it means
hasNext, otherwise, it
+ * means no more info to be fetched.
+ */
+ private ListenableFuture<Boolean> isBlocked = null;
+
private static final int BATCH_CACHED_SIZE = 1000;
private static final TimeseriesFilterVisitor FILTER_VISITOR = new
TimeseriesFilterVisitor();
@@ -61,29 +74,83 @@ public class TimeseriesReaderWithViewFetch implements
ISchemaReader<ITimeSeriesS
this.schemaFilter = schemaFilter;
}
+ @Override
public boolean isSuccess() {
return traverser.isSuccess();
}
+ @Override
public Throwable getFailure() {
return traverser.getFailure();
}
+ @Override
public void close() {
traverser.close();
}
- public boolean hasNext() {
- if (next == null && !consumeView) {
- fetchAndCacheNextResult();
+ /**
+ * Fetch ITimeSeriesSchemaInfo from the traverser and return only in the
following three cases
+ *
+ * <ol>
+ * <li>successfully fetched an info of normal time series. consumeView is
false and next is not
+ * null.
+ * <li>successfully fetched batch info of view time series. consumeView is
true and next is
+ * null.
+ * <li>no more info to be fetched. consumeView is false and next is null.
+ * </ol>
+ */
+ @Override
+ public ListenableFuture<Boolean> isBlocked() {
+ if (isBlocked != null) {
+ return isBlocked;
}
+ ListenableFuture<Boolean> res = NOT_BLOCKED_FALSE;
if (consumeView) {
- return !cachedViewList.isEmpty();
+ // consume view list
+ res = NOT_BLOCKED_TRUE;
+ } else if (next == null) {
+ // get next from traverser
+ ITimeSeriesSchemaInfo temp;
+ while (traverser.hasNext()) {
+ temp = traverser.next();
+ if (temp.isLogicalView()) {
+ // view timeseries
+ cachedViewList.add(temp.snapshot());
+ if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+ res = asyncGetNext();
+ break;
+ }
+ } else if (FILTER_VISITOR.process(schemaFilter, temp)) {
+ // normal timeseries
+ next = temp;
+ res = NOT_BLOCKED_TRUE;
+ break;
+ }
+ }
+ if (res == NOT_BLOCKED_FALSE && !cachedViewList.isEmpty()) {
+ // all schema info has been fetched, but there mau be still some view
schema info in
+ // cachedViewList
+ res = asyncGetNext();
+ }
} else {
- return next != null;
+ // next is not null
+ res = NOT_BLOCKED_TRUE;
}
+ isBlocked = res;
+ return res;
}
+ @Override
+ public boolean hasNext() {
+ try {
+ return isBlocked().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public ITimeSeriesSchemaInfo next() {
if (!hasNext()) {
throw new NoSuchElementException();
@@ -93,47 +160,43 @@ public class TimeseriesReaderWithViewFetch implements
ISchemaReader<ITimeSeriesS
result = next;
next = null;
} else {
+ // it may return null if cachedViewList is empty but consumeView is true
result = cachedViewList.poll();
consumeView = !cachedViewList.isEmpty();
}
+ isBlocked = null;
return result;
}
- /**
- * Fetch ITimeSeriesSchemaInfo from the traverser and return only in the
following three cases
- *
- * <ol>
- * <li>successfully fetched an info of normal time series. consumeView is
false and next is not
- * null.
- * <li>successfully fetched batch info of view time series. consumeView is
true and next is
- * null.
- * <li>no more info to be fetched. consumeView is false and next is null.
- * </ol>
- */
- private void fetchAndCacheNextResult() {
- ITimeSeriesSchemaInfo temp;
- while (traverser.hasNext()) {
- temp = traverser.next();
- if (temp.isLogicalView()) {
- cachedViewList.add(temp.snapshot());
- if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+ private ListenableFuture<Boolean> asyncGetNext() {
+ // enter this function only when viewList is full or all schema info has
been fetched and
+ // viewList is not empty
+ return Futures.submit(
+ () -> {
fetchViewTimeSeriesSchemaInfo();
if (consumeView) {
- break;
+ return true;
+ } else {
+ // all cache view is no satisfied
+ while (traverser.hasNext()) {
+ ITimeSeriesSchemaInfo temp = traverser.next();
+ if (temp.isLogicalView()) {
+ cachedViewList.add(temp.snapshot());
+ if (cachedViewList.size() >= BATCH_CACHED_SIZE) {
+ fetchViewTimeSeriesSchemaInfo();
+ if (consumeView) {
+ return true;
+ }
+ }
+ } else if (FILTER_VISITOR.process(schemaFilter, temp)) {
+ next = temp;
+ return true;
+ }
+ }
+ return false;
}
- }
- } else {
- if (FILTER_VISITOR.process(schemaFilter, temp)) {
- next = temp;
- break;
- }
- }
- }
- if (next == null && !cachedViewList.isEmpty()) {
- // all schema info has been fetched, but there mau be still some view
schema info in
- // cachedViewList
- fetchViewTimeSeriesSchemaInfo();
- }
+ },
+ FragmentInstanceManager.getInstance().getIntoOperationExecutor());
}
private void fetchViewTimeSeriesSchemaInfo() {
@@ -154,6 +217,7 @@ public class TimeseriesReaderWithViewFetch implements
ISchemaReader<ITimeSeriesS
}
// clear cachedViewList, all cached view will be added in the last step
cachedViewList.clear();
+
ISchemaTree schemaTree =
ClusterSchemaFetcher.getInstance().fetchSchema(patternTree, null);
// process each view expression and get data type
TransformToExpressionVisitor transformToExpressionVisitor = new
TransformToExpressionVisitor();
@@ -179,8 +243,6 @@ public class TimeseriesReaderWithViewFetch implements
ISchemaReader<ITimeSeriesS
cachedViewList.add(delayedLogicalViewList.get(i));
}
}
- if (!cachedViewList.isEmpty()) {
- consumeView = true;
- }
+ consumeView = !cachedViewList.isEmpty();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
index 3c368e7d124..6af5159939b 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -292,10 +292,6 @@ public interface ISchemaRegion {
ISchemaReader<IDeviceSchemaInfo> getDeviceReader(IShowDevicesPlan
showDevicesPlan)
throws MetadataException;
- /**
- * The iterated result shall be consumed before calling reader.hasNext() or
reader.next(). Its
- * implementation is based on the reader's process context.
- */
ISchemaReader<ITimeSeriesSchemaInfo> getTimeSeriesReader(IShowTimeSeriesPlan
showTimeSeriesPlan)
throws MetadataException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index 9a2ae5ccfa8..d58ded19174 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.tsfile.utils.Pair;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -249,6 +250,11 @@ public class TagManager {
// do nothing
}
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
@Override
public boolean hasNext() {
if (throwable == null) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
index 49ceb468ba1..a57026e28f7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountGroupByLevelScanOperator.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.Arrays;
import java.util.HashMap;
@@ -41,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class CountGroupByLevelScanOperator<T extends ISchemaInfo> implements
SourceOperator {
@@ -52,12 +54,13 @@ public class CountGroupByLevelScanOperator<T extends
ISchemaInfo> implements Sou
private final PlanNodeId sourceId;
private final OperatorContext operatorContext;
-
private final int level;
-
private final ISchemaSource<T> schemaSource;
+ private final Map<PartialPath, Long> countMap;
private ISchemaReader<T> schemaReader;
+ private ListenableFuture<?> isBlocked;
+ private TsBlock next;
public CountGroupByLevelScanOperator(
PlanNodeId sourceId,
@@ -68,6 +71,7 @@ public class CountGroupByLevelScanOperator<T extends
ISchemaInfo> implements Sou
this.operatorContext = operatorContext;
this.level = level;
this.schemaSource = schemaSource;
+ this.countMap = new HashMap<>();
}
@Override
@@ -80,20 +84,81 @@ public class CountGroupByLevelScanOperator<T extends
ISchemaInfo> implements Sou
return operatorContext;
}
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ if (isBlocked == null) {
+ isBlocked = tryGetNext();
+ }
+ return isBlocked;
+ }
+
+ /**
+ * Try to get next TsBlock. If the next is not ready, return a future. After
success, {@link
+ * CountGroupByLevelScanOperator#next} will be set.
+ */
+ private ListenableFuture<?> tryGetNext() {
+ if (schemaReader == null) {
+ schemaReader = createTimeSeriesReader();
+ }
+ while (true) {
+ try {
+ ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+ if (!readerBlocked.isDone()) {
+ readerBlocked.addListener(
+ () -> next = constructTsBlockAndClearMap(countMap),
directExecutor());
+ return readerBlocked;
+ } else if (schemaReader.hasNext()) {
+ ISchemaInfo schemaInfo = schemaReader.next();
+ PartialPath path = schemaInfo.getPartialPath();
+ if (path.getNodeLength() < level) {
+ continue;
+ }
+ PartialPath levelPath = new
PartialPath(Arrays.copyOf(path.getNodes(), level + 1));
+ countMap.compute(
+ levelPath,
+ (k, v) -> {
+ if (v == null) {
+ return 1L;
+ } else {
+ return v + 1;
+ }
+ });
+ if (countMap.size() == DEFAULT_BATCH_SIZE) {
+ next = constructTsBlockAndClearMap(countMap);
+ return NOT_BLOCKED;
+ }
+ } else {
+ if (countMap.isEmpty()) {
+ next = null;
+ } else {
+ next = constructTsBlockAndClearMap(countMap);
+ }
+ return NOT_BLOCKED;
+ }
+ } catch (Exception e) {
+ throw new SchemaExecutionException(e);
+ }
+ }
+ }
+
@Override
public TsBlock next() throws Exception {
if (!hasNext()) {
throw new NoSuchElementException();
}
- return generateResult();
+ TsBlock ret = next;
+ next = null;
+ isBlocked = null;
+ return ret;
}
@Override
public boolean hasNext() throws Exception {
- if (schemaReader == null) {
- schemaReader = createTimeSeriesReader();
+ isBlocked().get(); // wait for the next TsBlock
+ if (!schemaReader.isSuccess()) {
+ throw new SchemaExecutionException(schemaReader.getFailure());
}
- return schemaReader.hasNext();
+ return next != null;
}
public ISchemaReader<T> createTimeSeriesReader() {
@@ -101,35 +166,7 @@ public class CountGroupByLevelScanOperator<T extends
ISchemaInfo> implements Sou
((SchemaDriverContext)
operatorContext.getDriverContext()).getSchemaRegion());
}
- private TsBlock generateResult() {
- Map<PartialPath, Long> countMap = new HashMap<>();
- T schemaInfo;
- PartialPath path;
- PartialPath levelPath;
- while (schemaReader.hasNext()) {
- schemaInfo = schemaReader.next();
- path = schemaInfo.getPartialPath();
- if (path.getNodeLength() < level) {
- continue;
- }
- levelPath = new PartialPath(Arrays.copyOf(path.getNodes(), level + 1));
- countMap.compute(
- levelPath,
- (k, v) -> {
- if (v == null) {
- return 1L;
- } else {
- return v + 1;
- }
- });
- if (countMap.size() == DEFAULT_BATCH_SIZE) {
- break;
- }
- }
- if (!schemaReader.isSuccess()) {
- throw new SchemaExecutionException(schemaReader.getFailure());
- }
-
+ private TsBlock constructTsBlockAndClearMap(Map<PartialPath, Long> countMap)
{
TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
for (Map.Entry<PartialPath, Long> entry : countMap.entrySet()) {
tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
@@ -137,6 +174,7 @@ public class CountGroupByLevelScanOperator<T extends
ISchemaInfo> implements Sou
tsBlockBuilder.getColumnBuilder(1).writeLong(entry.getValue());
tsBlockBuilder.declarePosition();
}
+ countMap.clear();
return tsBlockBuilder.build();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
index 60e82bf6645..0f8444bc97f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaCountOperator.java
@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
@@ -47,7 +49,10 @@ public class SchemaCountOperator<T extends ISchemaInfo>
implements SourceOperato
private final ISchemaSource<T> schemaSource;
private ISchemaReader<T> schemaReader;
+ private int count;
private boolean isFinished;
+ private ListenableFuture<?> isBlocked;
+ private TsBlock next; // next will be set only when done
public SchemaCountOperator(
PlanNodeId sourceId, OperatorContext operatorContext, ISchemaSource<T>
schemaSource) {
@@ -56,7 +61,7 @@ public class SchemaCountOperator<T extends ISchemaInfo>
implements SourceOperato
this.schemaSource = schemaSource;
}
- private final ISchemaRegion getSchemaRegion() {
+ private ISchemaRegion getSchemaRegion() {
return ((SchemaDriverContext)
operatorContext.getDriverContext()).getSchemaRegion();
}
@@ -70,40 +75,74 @@ public class SchemaCountOperator<T extends ISchemaInfo>
implements SourceOperato
}
@Override
- public TsBlock next() throws Exception {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public ListenableFuture<?> isBlocked() {
+ if (isBlocked == null) {
+ isBlocked = tryGetNext();
}
- isFinished = true;
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
- long count = 0;
+ return isBlocked;
+ }
+
+ /**
+ * Try to get next TsBlock. If the next is not ready, return a future. After
success, {@link
+ * SchemaCountOperator#next} will be set.
+ */
+ private ListenableFuture<?> tryGetNext() {
ISchemaRegion schemaRegion = getSchemaRegion();
if (schemaSource.hasSchemaStatistic(schemaRegion)) {
- count = schemaSource.getSchemaStatistic(schemaRegion);
+ next = constructTsBlock(schemaSource.getSchemaStatistic(schemaRegion));
+ return NOT_BLOCKED;
} else {
if (schemaReader == null) {
schemaReader = createSchemaReader();
}
- while (schemaReader.hasNext()) {
- schemaReader.next();
- count++;
- }
- if (!schemaReader.isSuccess()) {
- throw new SchemaExecutionException(schemaReader.getFailure());
+ while (true) {
+ try {
+ ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+ if (!readerBlocked.isDone()) {
+ return readerBlocked;
+ } else if (schemaReader.hasNext()) {
+ schemaReader.next();
+ count++;
+ } else {
+ next = constructTsBlock(count);
+ return NOT_BLOCKED;
+ }
+ } catch (Exception e) {
+ throw new SchemaExecutionException(e);
+ }
}
}
+ }
- tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
- tsBlockBuilder.getColumnBuilder(0).writeLong(count);
- tsBlockBuilder.declarePosition();
- return tsBlockBuilder.build();
+ @Override
+ public TsBlock next() throws Exception {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ if (next != null) {
+ isFinished = true;
+ }
+ isBlocked = null;
+ return next;
}
@Override
public boolean hasNext() throws Exception {
+ isBlocked().get(); // wait for the next TsBlock
+ if (schemaReader != null && !schemaReader.isSuccess()) {
+ throw new SchemaExecutionException(schemaReader.getFailure());
+ }
return !isFinished;
}
+ private TsBlock constructTsBlock(long count) {
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(OUTPUT_DATA_TYPES);
+ tsBlockBuilder.getTimeColumnBuilder().writeLong(0L);
+ tsBlockBuilder.getColumnBuilder(0).writeLong(count);
+ tsBlockBuilder.declarePosition();
+ return tsBlockBuilder.build();
+ }
+
@Override
public boolean isFinished() throws Exception {
return isFinished;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
index 966f48ac8b6..89cb64ce676 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java
@@ -33,10 +33,13 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static
org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SchemaQueryScanOperator<T extends ISchemaInfo> implements
SourceOperator {
@@ -60,6 +63,11 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo>
implements SourceOpe
private ISchemaReader<T> schemaReader;
+ private final TsBlockBuilder tsBlockBuilder;
+ private ListenableFuture<?> isBlocked;
+ private TsBlock next;
+ private boolean isFinished;
+
protected SchemaQueryScanOperator(
PlanNodeId sourceId,
OperatorContext operatorContext,
@@ -76,6 +84,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo>
implements SourceOpe
this.sourceId = sourceId;
this.outputDataTypes = outputDataTypes;
this.schemaSource = null;
+ this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
public SchemaQueryScanOperator(
@@ -87,6 +96,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo>
implements SourceOpe
schemaSource.getInfoQueryColumnHeaders().stream()
.map(ColumnHeader::getColumnType)
.collect(Collectors.toList());
+ this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
protected ISchemaReader<T> createSchemaReader() {
@@ -94,7 +104,7 @@ public class SchemaQueryScanOperator<T extends ISchemaInfo>
implements SourceOpe
((SchemaDriverContext)
operatorContext.getDriverContext()).getSchemaRegion());
}
- protected void setColumns(T element, TsBlockBuilder builder) {
+ private void setColumns(T element, TsBlockBuilder builder) {
schemaSource.transformToTsBlockColumns(element, builder, getDatabase());
}
@@ -128,36 +138,79 @@ public class SchemaQueryScanOperator<T extends
ISchemaInfo> implements SourceOpe
}
@Override
- public TsBlock next() throws Exception {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public ListenableFuture<?> isBlocked() {
+ if (isBlocked == null) {
+ isBlocked = tryGetNext();
}
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
- T element;
- while (schemaReader.hasNext()) {
- element = schemaReader.next();
- setColumns(element, tsBlockBuilder);
- if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
- break;
+ return isBlocked;
+ }
+
+ /**
+ * Try to get next TsBlock. If the next is not ready, return a future. After
success, {@link
+ * SchemaQueryScanOperator#next} will be set.
+ */
+ private ListenableFuture<?> tryGetNext() {
+ if (schemaReader == null) {
+ schemaReader = createSchemaReader();
+ }
+ while (true) {
+ try {
+ ListenableFuture<?> readerBlocked = schemaReader.isBlocked();
+ if (!readerBlocked.isDone()) {
+ readerBlocked.addListener(
+ () -> {
+ next = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ },
+ directExecutor());
+ return readerBlocked;
+ } else if (schemaReader.hasNext()) {
+ T element = schemaReader.next();
+ setColumns(element, tsBlockBuilder);
+ if (tsBlockBuilder.getRetainedSizeInBytes() >= MAX_SIZE) {
+ next = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return NOT_BLOCKED;
+ }
+ } else {
+ if (tsBlockBuilder.isEmpty()) {
+ next = null;
+ isFinished = true;
+ } else {
+ next = tsBlockBuilder.build();
+ }
+ tsBlockBuilder.reset();
+ return NOT_BLOCKED;
+ }
+ } catch (Exception e) {
+ throw new SchemaExecutionException(e);
}
}
- if (!schemaReader.isSuccess()) {
- throw new SchemaExecutionException(schemaReader.getFailure());
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
}
- return tsBlockBuilder.build();
+ TsBlock ret = next;
+ next = null;
+ isBlocked = null;
+ return ret;
}
@Override
public boolean hasNext() throws Exception {
- if (schemaReader == null) {
- schemaReader = createSchemaReader();
+ isBlocked().get(); // wait for the next TsBlock
+ if (!schemaReader.isSuccess()) {
+ throw new SchemaExecutionException(schemaReader.getFailure());
}
- return schemaReader.hasNext();
+ return next != null;
}
@Override
public boolean isFinished() throws Exception {
- return !hasNextWithTimer();
+ return isFinished;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
index e1143e317f5..acc818cc4f3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/LogicalViewSchemaSource.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.schema.view.ViewType;
import
org.apache.iotdb.db.metadata.plan.schemaregion.impl.read.SchemaRegionReadPlanFactory;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
-import
org.apache.iotdb.db.metadata.query.reader.SchemaReaderLimitOffsetWrapper;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
@@ -64,18 +63,15 @@ public class LogicalViewSchemaSource implements
ISchemaSource<ITimeSeriesSchemaI
@Override
public ISchemaReader<ITimeSeriesSchemaInfo> getSchemaReader(ISchemaRegion
schemaRegion) {
try {
- return new SchemaReaderLimitOffsetWrapper<>(
- schemaRegion.getTimeSeriesReader(
- SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
- pathPattern,
- Collections.emptyMap(),
- 0,
- 0,
- false,
- SchemaFilterFactory.and(
- schemaFilter,
SchemaFilterFactory.createViewTypeFilter(ViewType.VIEW)))),
- limit,
- offset);
+ return schemaRegion.getTimeSeriesReader(
+ SchemaRegionReadPlanFactory.getShowTimeSeriesPlan(
+ pathPattern,
+ Collections.emptyMap(),
+ limit,
+ offset,
+ false,
+ SchemaFilterFactory.and(
+ schemaFilter,
SchemaFilterFactory.createViewTypeFilter(ViewType.VIEW))));
} catch (MetadataException e) {
throw new SchemaExecutionException(e.getMessage(), e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
index 2ce78e50de1..82b7a02548d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/PathsUsingTemplateSource.java
@@ -30,8 +30,11 @@ import
org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
+import com.google.common.util.concurrent.ListenableFuture;
+
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
public class PathsUsingTemplateSource implements
ISchemaSource<IDeviceSchemaInfo> {
@@ -96,6 +99,11 @@ public class PathsUsingTemplateSource implements
ISchemaSource<IDeviceSchemaInfo
}
}
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
+
@Override
public boolean hasNext() {
try {
@@ -133,6 +141,9 @@ public class PathsUsingTemplateSource implements
ISchemaSource<IDeviceSchemaInfo
@Override
public IDeviceSchemaInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
return currentDeviceReader.next();
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
index 882076f6fca..18f75e1219a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaOperatorTestUtil.java
@@ -24,9 +24,11 @@ import
org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.execution.operator.schema.source.ISchemaSource;
+import com.google.common.util.concurrent.ListenableFuture;
import org.mockito.Mockito;
import java.util.Iterator;
+import java.util.NoSuchElementException;
public class SchemaOperatorTestUtil {
public static final String EXCEPTION_MESSAGE = "ExceptionMessage";
@@ -50,15 +52,23 @@ public class SchemaOperatorTestUtil {
}
@Override
- public void close() {}
+ public ListenableFuture<?> isBlocked() {
+ return NOT_BLOCKED;
+ }
@Override
public boolean hasNext() {
return iterator.hasNext();
}
+ @Override
+ public void close() {}
+
@Override
public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
return iterator.next();
}
});