This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 50552c6c0f [IOTDB-3598] Fix queue has been destroyed bug (#6395)
50552c6c0f is described below

commit 50552c6c0f38363d47af630d5695578a38f32028
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 23 20:52:09 2022 +0800

    [IOTDB-3598] Fix queue has been destroyed bug (#6395)
---
 .../operator/schema/SchemaFetchMergeOperator.java  | 52 +++++++++-------------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  2 +-
 .../node/metedata/read/SchemaFetchMergeNode.java   |  2 +-
 3 files changed, 22 insertions(+), 34 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index a599b83bfe..4c862fee6e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.schema;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,18 +30,17 @@ import java.util.List;
 
 public class SchemaFetchMergeOperator implements ProcessOperator {
 
-  private final PlanNodeId planNodeId;
   private final OperatorContext operatorContext;
-  private final boolean[] noMoreTsBlocks;
-
   private final List<Operator> children;
 
-  public SchemaFetchMergeOperator(
-      PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator> 
children) {
-    this.planNodeId = planNodeId;
+  private final int childrenCount;
+  private int currentIndex;
+
+  public SchemaFetchMergeOperator(OperatorContext operatorContext, 
List<Operator> children) {
     this.operatorContext = operatorContext;
     this.children = children;
-    noMoreTsBlocks = new boolean[children.size()];
+    this.childrenCount = children.size();
+    this.currentIndex = 0;
   }
 
   @Override
@@ -52,43 +50,33 @@ public class SchemaFetchMergeOperator implements 
ProcessOperator {
 
   @Override
   public TsBlock next() {
-    for (int i = 0; i < children.size(); i++) {
-      if (!noMoreTsBlocks[i]) {
-        TsBlock tsBlock = children.get(i).next();
-        if (!children.get(i).hasNext()) {
-          noMoreTsBlocks[i] = true;
-        }
-        return tsBlock;
-      }
+    if (children.get(currentIndex).hasNext()) {
+      return children.get(currentIndex).next();
+    } else {
+      currentIndex++;
+      return null;
     }
-    return null;
   }
 
   @Override
   public boolean hasNext() {
-    for (int i = 0; i < children.size(); i++) {
-      if (!noMoreTsBlocks[i] && children.get(i).hasNext()) {
-        return true;
-      }
-    }
-    return false;
+    return currentIndex < childrenCount;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    for (int i = 0; i < children.size(); i++) {
-      if (!noMoreTsBlocks[i]) {
-        ListenableFuture<Void> blocked = children.get(i).isBlocked();
-        if (!blocked.isDone()) {
-          return blocked;
-        }
-      }
-    }
-    return NOT_BLOCKED;
+    return children.get(currentIndex).isBlocked();
   }
 
   @Override
   public boolean isFinished() {
     return !hasNext();
   }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator child : children) {
+      child.close();
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 17406785cd..82fd67af9a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1112,7 +1112,7 @@ public class LocalExecutionPlanner {
               context.getNextOperatorId(),
               node.getPlanNodeId(),
               SchemaFetchMergeOperator.class.getSimpleName());
-      return new SchemaFetchMergeOperator(node.getPlanNodeId(), 
operatorContext, children);
+      return new SchemaFetchMergeOperator(operatorContext, children);
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
index 2552c4540f..8d1b999ef9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
@@ -61,6 +61,6 @@ public class SchemaFetchMergeNode extends 
AbstractSchemaMergeNode {
   }
 
   public String toString() {
-    return String.format("SchemaFetchNode-%s", getPlanNodeId());
+    return String.format("SchemaFetchMergeNode-%s", getPlanNodeId());
   }
 }

Reply via email to