This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6e76889ec8c7e1d89f7525e6d2f24202ec6e2d44
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Mar 30 21:58:07 2022 +0800

    add driver which implements ExecFragmentInstance interface
---
 .../storagegroup/VirtualStorageGroupProcessor.java |  31 +++
 .../org/apache/iotdb/db/mpp/execution/Driver.java  | 310 +++++++++++++++++++++
 .../iotdb/db/mpp/execution/DriverContext.java      |  59 ++++
 .../db/mpp/execution/FragmentInstanceContext.java  |   4 +
 .../org/apache/iotdb/db/mpp/operator/Operator.java |   5 +
 .../db/mpp/operator/process/AggregateOperator.java |   5 +
 .../mpp/operator/process/DeviceMergeOperator.java  |   5 +
 .../db/mpp/operator/process/FillOperator.java      |   5 +
 .../mpp/operator/process/FilterNullOperator.java   |   5 +
 .../mpp/operator/process/GroupByLevelOperator.java |   5 +
 .../db/mpp/operator/process/LimitOperator.java     |   5 +
 .../db/mpp/operator/process/OffsetOperator.java    |   5 +
 .../db/mpp/operator/process/SortOperator.java      |   5 +
 .../db/mpp/operator/process/TimeJoinOperator.java  |  21 ++
 .../db/mpp/operator/sink/FragmentSinkOperator.java |   5 +
 .../mpp/operator/source/AlignedSeriesScanUtil.java |   5 +-
 .../source/SeriesAggregateScanOperator.java        |   9 +
 .../db/mpp/operator/source/SeriesScanOperator.java |  22 +-
 .../db/mpp/operator/source/SeriesScanUtil.java     |  25 +-
 .../db/mpp/operator/source/SourceOperator.java     |   3 +
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   4 -
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   7 +-
 .../db/mpp/operator/SeriesScanOperatorTest.java    |   5 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   7 +-
 24 files changed, 531 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 87d7b4e..a55ae80 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -1727,6 +1727,37 @@ public class VirtualStorageGroupProcessor {
     }
   }
 
+  /** used for mpp */
+  public QueryDataSource query(
+      List<PartialPath> pathList, String singleDeviceId, QueryContext context, 
Filter timeFilter)
+      throws QueryProcessException {
+    try {
+      List<TsFileResource> seqResources =
+          getFileResourceListForQuery(
+              tsFileManager.getTsFileList(true),
+              upgradeSeqFileList,
+              pathList,
+              singleDeviceId,
+              context,
+              timeFilter,
+              true);
+      List<TsFileResource> unseqResources =
+          getFileResourceListForQuery(
+              tsFileManager.getTsFileList(false),
+              upgradeUnseqFileList,
+              pathList,
+              singleDeviceId,
+              context,
+              timeFilter,
+              false);
+      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unseqResources);
+      dataSource.setDataTTL(dataTTL);
+      return dataSource;
+    } catch (MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
   /** lock the read lock of the insert lock */
   public void readLock() {
     // apply read lock for SG insert lock to prevent inconsistent with 
concurrently writing memtable
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
new file mode 100644
index 0000000..91623a4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Driver.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.buffer.ISinkHandle;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
+
+@NotThreadSafe
+public class Driver implements ExecFragmentInstance {
+
+  private static final Logger logger = LoggerFactory.getLogger(Driver.class);
+
+  private final Operator root;
+  private final ISinkHandle sinkHandle;
+  private final DriverContext driverContext;
+
+  private boolean init;
+  private boolean closed;
+
+  /** closed tsfile used in this fragment instance */
+  private Set<TsFileResource> closedFilePaths;
+  /** unClosed tsfile used in this fragment instance */
+  private Set<TsFileResource> unClosedFilePaths;
+
+  private final AtomicReference<SettableFuture<Void>> driverBlockedFuture = 
new AtomicReference<>();
+
+  public Driver(Operator root, ISinkHandle sinkHandle, DriverContext 
driverContext) {
+    this.root = root;
+    this.sinkHandle = sinkHandle;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (closed) {
+      return true;
+    }
+    try {
+      return root != null && root.isFinished();
+    } catch (Throwable t) {
+      logger.error("Failed to query whether the driver {} is finished", 
driverContext.getId(), t);
+      close();
+      return true;
+    }
+  }
+
+  @Override
+  public ListenableFuture<Void> processFor(Duration duration) {
+
+    // initialization may be time-consuming, so we keep it in the processFor 
method
+    // in normal case, it won't cause deadlock and should finish soon, 
otherwise it will be a
+    // critical bug
+    if (!init) {
+      try {
+        initialize();
+      } catch (Throwable t) {
+        logger.error(
+            "Failed to do the initialization for fragment instance {} ", 
driverContext.getId(), t);
+        close();
+        return NOT_BLOCKED;
+      }
+    }
+
+    // if the driver is blocked we don't need to continue
+    SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
+    if (!blockedFuture.isDone()) {
+      return blockedFuture;
+    }
+
+    long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
+
+    long start = System.nanoTime();
+    try {
+      do {
+        ListenableFuture<Void> future = processInternal();
+        if (!future.isDone()) {
+          return updateDriverBlockedFuture(future);
+        }
+      } while (System.nanoTime() - start < maxRuntime && !root.isFinished());
+    } catch (Throwable t) {
+      logger.error("Failed to execute fragment instance {}", 
driverContext.getId(), t);
+      close();
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public FragmentInstanceId getInfo() {
+    return driverContext.getId();
+  }
+
+  @Override
+  public void close() {
+    Operator root;
+    ISinkHandle sinkHandle;
+    synchronized (this) {
+      closed = true;
+      root = this.root;
+      sinkHandle = this.sinkHandle;
+    }
+
+    try {
+      if (root != null) {
+        root.close();
+      }
+      if (sinkHandle != null) {
+        sinkHandle.close();
+      }
+    } catch (Throwable t) {
+      logger.error("Failed to closed driver {}", driverContext.getId(), t);
+    } finally {
+      removeUsedFilesForQuery();
+    }
+  }
+
+  /**
+   * init seq file list and unseq file list in QueryDataSource and set it into 
each SourceNode TODO
+   * we should change all the blocked lock operation into tryLock
+   */
+  private void initialize() throws QueryProcessException {
+    List<SourceOperator> sourceOperators = driverContext.getSourceOperators();
+    if (sourceOperators != null && !sourceOperators.isEmpty()) {
+      QueryDataSource dataSource = initQueryDataSourceCache();
+      sourceOperators.forEach(
+          sourceOperator -> {
+            // construct QueryDataSource for source operator
+            QueryDataSource queryDataSource =
+                new QueryDataSource(dataSource.getSeqResources(), 
dataSource.getUnseqResources());
+
+            queryDataSource.setDataTTL(dataSource.getDataTTL());
+
+            sourceOperator.initQueryDataSource(queryDataSource);
+          });
+    }
+
+    this.init = true;
+  }
+
+  /**
+   * The method is called in mergeLock() when executing query. This method 
will get all the
+   * QueryDataSource needed for this query
+   */
+  public QueryDataSource initQueryDataSourceCache() throws 
QueryProcessException {
+    VirtualStorageGroupProcessor dataRegion = driverContext.getDataRegion();
+    dataRegion.readLock();
+    try {
+      List<PartialPath> pathList =
+          driverContext.getPaths().stream()
+              .map(IDTable::translateQueryPath)
+              .collect(Collectors.toList());
+      // when all the selected series are under the same device, the 
QueryDataSource will be
+      // filtered according to timeIndex
+      Set<String> selectedDeviceIdSet =
+          
pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+
+      QueryDataSource dataSource =
+          dataRegion.query(
+              pathList,
+              selectedDeviceIdSet.size() == 1 ? 
selectedDeviceIdSet.iterator().next() : null,
+              driverContext.getFragmentInstanceContext(),
+              driverContext.getTimeFilter());
+
+      // used files should be added before mergeLock is unlocked, or they may 
be deleted by
+      // running merge
+      addUsedFilesForQuery(dataSource);
+
+      return dataSource;
+    } finally {
+      driverContext.getDataRegion().readUnlock();
+    }
+  }
+
+  /** Add the unique file paths to closeddFilePathsMap and 
unClosedFilePathsMap. */
+  private void addUsedFilesForQuery(QueryDataSource dataSource) {
+
+    // sequence data
+    addUsedFilesForQuery(dataSource.getSeqResources());
+
+    // unsequence data
+    addUsedFilesForQuery(dataSource.getUnseqResources());
+  }
+
+  private void addUsedFilesForQuery(List<TsFileResource> resources) {
+    Iterator<TsFileResource> iterator = resources.iterator();
+    while (iterator.hasNext()) {
+      TsFileResource tsFileResource = iterator.next();
+      boolean isClosed = tsFileResource.isClosed();
+      addFilePathToMap(tsFileResource, isClosed);
+
+      // this file may be deleted just before we lock it
+      if (tsFileResource.isDeleted()) {
+        Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
+        // This resource may be removed by other threads of this query.
+        if (pathSet.remove(tsFileResource)) {
+          
FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, 
isClosed);
+        }
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * All file paths used by this fragment instance must be cleared and thus 
the usage reference must
+   * be decreased.
+   */
+  private void removeUsedFilesForQuery() {
+    for (TsFileResource tsFile : closedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
+    }
+    closedFilePaths = null;
+    for (TsFileResource tsFile : unClosedFilePaths) {
+      FileReaderManager.getInstance().decreaseFileReaderReference(tsFile, 
true);
+    }
+    unClosedFilePaths = null;
+  }
+
+  /**
+   * Increase the usage reference of filePath of job id. Before the invoking 
of this method, <code>
+   * this.setqueryIdForCurrentRequestThread</code> has been invoked, so <code>
+   * sealedFilePathsMap.get(queryId)</code> or 
<code>unsealedFilePathsMap.get(queryId)</code> must
+   * not return null.
+   */
+  void addFilePathToMap(TsFileResource tsFile, boolean isClosed) {
+    Set<TsFileResource> pathSet = isClosed ? closedFilePaths : 
unClosedFilePaths;
+    if (!pathSet.contains(tsFile)) {
+      pathSet.add(tsFile);
+      FileReaderManager.getInstance().increaseFileReaderReference(tsFile, 
isClosed);
+    }
+  }
+
+  private ListenableFuture<Void> processInternal() throws IOException {
+    ListenableFuture<Void> blocked = root.isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    blocked = sinkHandle.isFull();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    if (root.hasNext()) {
+      TsBlock tsBlock = root.next();
+      if (tsBlock != null && !tsBlock.isEmpty()) {
+        sinkHandle.send(tsBlock);
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  private ListenableFuture<Void> updateDriverBlockedFuture(
+      ListenableFuture<Void> sourceBlockedFuture) {
+    // driverBlockedFuture will be completed as soon as the 
sourceBlockedFuture is completed
+    // or any of the operators gets a memory revocation request
+    SettableFuture<Void> newDriverBlockedFuture = SettableFuture.create();
+    driverBlockedFuture.set(newDriverBlockedFuture);
+    sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
+
+    // TODO Although we don't have memory management for operator now, we 
should consider it for
+    // future
+    // it's possible that memory revoking is requested for some operator
+    // before we update driverBlockedFuture above and we don't want to miss 
that
+    // notification, so we check to see whether that's the case before 
returning.
+
+    return newDriverBlockedFuture;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
new file mode 100644
index 0000000..aa3a265
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DriverContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.source.SourceOperator;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.List;
+
+public class DriverContext {
+  private FragmentInstanceContext fragmentInstanceContext;
+  private List<PartialPath> paths;
+  private Filter timeFilter;
+  private VirtualStorageGroupProcessor dataRegion;
+  private List<SourceOperator> sourceOperators;
+
+  public FragmentInstanceId getId() {
+    return fragmentInstanceContext.getId();
+  }
+
+  public FragmentInstanceContext getFragmentInstanceContext() {
+    return fragmentInstanceContext;
+  }
+
+  public List<PartialPath> getPaths() {
+    return paths;
+  }
+
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  public VirtualStorageGroupProcessor getDataRegion() {
+    return dataRegion;
+  }
+
+  public List<SourceOperator> getSourceOperators() {
+    return sourceOperators;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
index 015212f..f9333da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -69,4 +69,8 @@ public class FragmentInstanceContext extends QueryContext {
   public List<OperatorContext> getOperatorContexts() {
     return operatorContexts;
   }
+
+  public FragmentInstanceId getId() {
+    return id;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
index df89218..0821424 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -48,4 +48,9 @@ public interface Operator extends AutoCloseable {
   /** This method will always be called before releasing the Operator 
reference. */
   @Override
   default void close() throws Exception {}
+
+  /**
+   * Is this operator completely finished processing and no more output 
TsBlock will be produced.
+   */
+  boolean isFinished() throws IOException;
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index a08fb68..0eda188 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -49,4 +49,9 @@ public class AggregateOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index ebfb54e..b2439b1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -48,4 +48,9 @@ public class DeviceMergeOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index 52a3fba..cfbe1e0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -48,4 +48,9 @@ public class FillOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index 0083ae0..a8c28e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -49,4 +49,9 @@ public class FilterNullOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 285427a..5f87d42 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -49,4 +49,9 @@ public class GroupByLevelOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index ec2995f..ef54668 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -74,4 +74,9 @@ public class LimitOperator implements ProcessOperator {
   public void close() throws Exception {
     child.close();
   }
+
+  @Override
+  public boolean isFinished() throws IOException {
+    return remainingLimit == 0 || child.isFinished();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index a22985c..7cc8102 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -49,4 +49,9 @@ public class OffsetOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index 5e2f0ce..744e5b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -49,4 +49,9 @@ public class SortOperator implements ProcessOperator {
   public void close() throws Exception {
     ProcessOperator.super.close();
   }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 1020e41..0d0feb6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -59,6 +59,8 @@ public class TimeJoinOperator implements ProcessOperator {
    */
   private final List<TSDataType> dataTypes;
 
+  private boolean finished;
+
   public TimeJoinOperator(
       OperatorContext operatorContext,
       List<Operator> children,
@@ -154,6 +156,9 @@ public class TimeJoinOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() throws IOException {
+    if (finished) {
+      return false;
+    }
     for (int i = 0; i < inputCount; i++) {
       if (!empty(i)) {
         return true;
@@ -175,6 +180,22 @@ public class TimeJoinOperator implements ProcessOperator {
     }
   }
 
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+    for (int i = 0; i < columnCount; i++) {
+      // has more tsBlock output from children[i] or has cached tsBlock in 
inputTsBlocks[i]
+      if (!noMoreTsBlocks[i] || !empty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
   private boolean empty(int columnIndex) {
     return inputTsBlocks[columnIndex] == null
         || inputTsBlocks[columnIndex].getPositionCount() == 
inputIndex[columnIndex];
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index 7bbccbf..9b67c6c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -51,6 +51,11 @@ public class FragmentSinkOperator implements SinkOperator {
   }
 
   @Override
+  public boolean isFinished() {
+    return false;
+  }
+
+  @Override
   public void send(TsBlock tsBlock) {}
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
index 9fd48c7..58cb25e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/AlignedSeriesScanUtil.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.operator.source;
 
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -48,12 +47,10 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
       Set<String> allSensors,
       TSDataType dataType,
       FragmentInstanceContext context,
-      QueryDataSource dataSource,
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
-    super(
-        seriesPath, allSensors, dataType, context, dataSource, timeFilter, 
valueFilter, ascending);
+    super(seriesPath, allSensors, dataType, context, timeFilter, valueFilter, 
ascending);
     dataTypes =
         ((AlignedPath) seriesPath)
             
.getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 87151ba..07aa5a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.source;
 
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -51,7 +52,15 @@ public class SeriesAggregateScanOperator implements 
SourceOperator {
   }
 
   @Override
+  public boolean isFinished() {
+    return false;
+  }
+
+  @Override
   public PlanNodeId getSourceId() {
     return null;
   }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {}
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index e84e649..acb40ba 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -29,19 +29,19 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import java.io.IOException;
 import java.util.Set;
 
-public class SeriesScanOperator implements Operator {
+public class SeriesScanOperator implements SourceOperator {
 
   private final OperatorContext operatorContext;
   private final SeriesScanUtil seriesScanUtil;
   private TsBlock tsBlock;
   private boolean hasCachedTsBlock = false;
+  private boolean finished = false;
 
   public SeriesScanOperator(
       PartialPath seriesPath,
       Set<String> allSensors,
       TSDataType dataType,
       OperatorContext context,
-      QueryDataSource dataSource,
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
@@ -52,7 +52,6 @@ public class SeriesScanOperator implements Operator {
             allSensors,
             dataType,
             context.getInstanceContext(),
-            dataSource,
             timeFilter,
             valueFilter,
             ascending);
@@ -107,6 +106,11 @@ public class SeriesScanOperator implements Operator {
     return hasCachedTsBlock;
   }
 
+  @Override
+  public boolean isFinished() throws IOException {
+    return finished || (finished = hasNext());
+  }
+
   private boolean readChunkData() throws IOException {
     while (seriesScanUtil.hasNextChunk()) {
       if (readPageData()) {
@@ -129,4 +133,14 @@ public class SeriesScanOperator implements Operator {
   private boolean isEmpty(TsBlock tsBlock) {
     return tsBlock == null || tsBlock.isEmpty();
   }
+
+  @Override
+  public PlanNodeId getSourceId() {
+    return null;
+  }
+
+  @Override
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 3a1041d..80e0a67 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
 import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata;
@@ -73,7 +74,7 @@ public class SeriesScanUtil {
   private final Filter timeFilter;
   private final Filter valueFilter;
 
-  private final QueryDataSource dataSource;
+  private QueryDataSource dataSource;
 
   /*
    * file index
@@ -117,7 +118,6 @@ public class SeriesScanUtil {
       Set<String> allSensors,
       TSDataType dataType,
       FragmentInstanceContext context,
-      QueryDataSource dataSource,
       Filter timeFilter,
       Filter valueFilter,
       boolean ascending) {
@@ -125,18 +125,15 @@ public class SeriesScanUtil {
     this.allSensors = allSensors;
     this.dataType = dataType;
     this.context = context;
-    this.dataSource = dataSource;
     this.timeFilter = timeFilter;
     this.valueFilter = valueFilter;
     if (ascending) {
       this.orderUtils = new AscTimeOrderUtils();
       mergeReader = getPriorityMergeReader();
-      this.curSeqFileIndex = 0;
       this.curUnseqFileIndex = 0;
     } else {
       this.orderUtils = new DescTimeOrderUtils();
       mergeReader = getDescPriorityMergeReader();
-      this.curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
       this.curUnseqFileIndex = 0;
     }
 
@@ -154,6 +151,12 @@ public class SeriesScanUtil {
                 versionPageReader -> 
orderUtils.getOrderTime(versionPageReader.getStatistics())));
   }
 
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    QueryUtils.fillOrderIndexes(dataSource, seriesPath.getDevice(), 
orderUtils.getAscending());
+    this.dataSource = dataSource;
+    orderUtils.setCurSeqFileIndex(dataSource);
+  }
+
   protected PriorityMergeReader getPriorityMergeReader() {
     return new PriorityMergeReader();
   }
@@ -1162,6 +1165,8 @@ public class SeriesScanUtil {
     TsFileResource getNextSeqFileResource(boolean isDelete);
 
     TsFileResource getNextUnseqFileResource(boolean isDelete);
+
+    void setCurSeqFileIndex(QueryDataSource dataSource);
   }
 
   class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1273,6 +1278,11 @@ public class SeriesScanUtil {
       }
       return tsFileResource;
     }
+
+    @Override
+    public void setCurSeqFileIndex(QueryDataSource dataSource) {
+      curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
+    }
   }
 
   class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1384,5 +1394,10 @@ public class SeriesScanUtil {
       }
       return tsFileResource;
     }
+
+    @Override
+    public void setCurSeqFileIndex(QueryDataSource dataSource) {
+      curSeqFileIndex = 0;
+    }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
index 8454fd6..2e81002 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.db.mpp.operator.source;
 
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 public interface SourceOperator extends Operator {
 
   PlanNodeId getSourceId();
+
+  void initQueryDataSource(QueryDataSource dataSource);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 72f8544..b3c60d7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
@@ -66,14 +65,11 @@ public class LocalExecutionPlanner {
       OperatorContext operatorContext =
           context.taskContext.addOperatorContext(
               context.getNextOperatorId(), node.getId(), 
SeriesScanOperator.class.getSimpleName());
-      // TODO should create QueryDataSource in SeriesScanOperator's runtime
-      QueryDataSource dataSource = null;
       return new SeriesScanOperator(
           seriesPath,
           node.getAllSensors(),
           seriesPath.getSeriesType(),
           operatorContext,
-          dataSource,
           node.getTimeFilter(),
           node.getValueFilter(),
           ascending);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 14cbeec..b9b439c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -33,7 +33,6 @@ import 
org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -95,18 +94,16 @@ public class LimitOperatorTest {
           3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
       fragmentInstanceContext.addOperatorContext(
           4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());
-      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
-      QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), 
true);
       SeriesScanOperator seriesScanOperator1 =
           new SeriesScanOperator(
               measurementPath1,
               allSensors,
               TSDataType.INT32,
               fragmentInstanceContext.getOperatorContexts().get(0),
-              dataSource,
               null,
               null,
               true);
+      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
 
       MeasurementPath measurementPath2 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
@@ -116,10 +113,10 @@ public class LimitOperatorTest {
               allSensors,
               TSDataType.INT32,
               fragmentInstanceContext.getOperatorContexts().get(1),
-              dataSource,
               null,
               null,
               true);
+      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
 
       TimeJoinOperator timeJoinOperator =
           new TimeJoinOperator(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index c5c3d15..1fd496e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -83,18 +82,16 @@ public class SeriesScanOperatorTest {
               new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance"));
       fragmentInstanceContext.addOperatorContext(
           1, new PlanNodeId("1"), SeriesScanOperator.class.getSimpleName());
-      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
-      QueryUtils.fillOrderIndexes(dataSource, measurementPath.getDevice(), 
true);
       SeriesScanOperator seriesScanOperator =
           new SeriesScanOperator(
               measurementPath,
               allSensors,
               TSDataType.INT32,
               fragmentInstanceContext.getOperatorContexts().get(0),
-              dataSource,
               null,
               null,
               true);
+      seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, 
unSeqResources));
       int count = 0;
       while (seriesScanOperator.hasNext()) {
         TsBlock tsBlock = seriesScanOperator.next();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index d1722ca..5548b79 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -32,7 +32,6 @@ import 
org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -89,18 +88,16 @@ public class TimeJoinOperatorTest {
           2, new PlanNodeId("2"), SeriesScanOperator.class.getSimpleName());
       fragmentInstanceContext.addOperatorContext(
           3, new PlanNodeId("3"), TimeJoinOperator.class.getSimpleName());
-      QueryDataSource dataSource = new QueryDataSource(seqResources, 
unSeqResources);
-      QueryUtils.fillOrderIndexes(dataSource, measurementPath1.getDevice(), 
true);
       SeriesScanOperator seriesScanOperator1 =
           new SeriesScanOperator(
               measurementPath1,
               allSensors,
               TSDataType.INT32,
               fragmentInstanceContext.getOperatorContexts().get(0),
-              dataSource,
               null,
               null,
               true);
+      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
 
       MeasurementPath measurementPath2 =
           new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
@@ -110,10 +107,10 @@ public class TimeJoinOperatorTest {
               allSensors,
               TSDataType.INT32,
               fragmentInstanceContext.getOperatorContexts().get(1),
-              dataSource,
               null,
               null,
               true);
+      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
 
       TimeJoinOperator timeJoinOperator =
           new TimeJoinOperator(

Reply via email to