This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 50552c6c0f [IOTDB-3598] Fix queue has been destroyed bug (#6395)
50552c6c0f is described below
commit 50552c6c0f38363d47af630d5695578a38f32028
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 23 20:52:09 2022 +0800
[IOTDB-3598] Fix queue has been destroyed bug (#6395)
---
.../operator/schema/SchemaFetchMergeOperator.java | 52 +++++++++-------------
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +-
.../node/metedata/read/SchemaFetchMergeNode.java | 2 +-
3 files changed, 22 insertions(+), 34 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
index a599b83bfe..4c862fee6e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.schema;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
@@ -31,18 +30,17 @@ import java.util.List;
public class SchemaFetchMergeOperator implements ProcessOperator {
- private final PlanNodeId planNodeId;
private final OperatorContext operatorContext;
- private final boolean[] noMoreTsBlocks;
-
private final List<Operator> children;
- public SchemaFetchMergeOperator(
- PlanNodeId planNodeId, OperatorContext operatorContext, List<Operator>
children) {
- this.planNodeId = planNodeId;
+ private final int childrenCount;
+ private int currentIndex;
+
+ public SchemaFetchMergeOperator(OperatorContext operatorContext,
List<Operator> children) {
this.operatorContext = operatorContext;
this.children = children;
- noMoreTsBlocks = new boolean[children.size()];
+ this.childrenCount = children.size();
+ this.currentIndex = 0;
}
@Override
@@ -52,43 +50,33 @@ public class SchemaFetchMergeOperator implements
ProcessOperator {
@Override
public TsBlock next() {
- for (int i = 0; i < children.size(); i++) {
- if (!noMoreTsBlocks[i]) {
- TsBlock tsBlock = children.get(i).next();
- if (!children.get(i).hasNext()) {
- noMoreTsBlocks[i] = true;
- }
- return tsBlock;
- }
+ if (children.get(currentIndex).hasNext()) {
+ return children.get(currentIndex).next();
+ } else {
+ currentIndex++;
+ return null;
}
- return null;
}
@Override
public boolean hasNext() {
- for (int i = 0; i < children.size(); i++) {
- if (!noMoreTsBlocks[i] && children.get(i).hasNext()) {
- return true;
- }
- }
- return false;
+ return currentIndex < childrenCount;
}
@Override
public ListenableFuture<Void> isBlocked() {
- for (int i = 0; i < children.size(); i++) {
- if (!noMoreTsBlocks[i]) {
- ListenableFuture<Void> blocked = children.get(i).isBlocked();
- if (!blocked.isDone()) {
- return blocked;
- }
- }
- }
- return NOT_BLOCKED;
+ return children.get(currentIndex).isBlocked();
}
@Override
public boolean isFinished() {
return !hasNext();
}
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 17406785cd..82fd67af9a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -1112,7 +1112,7 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
SchemaFetchMergeOperator.class.getSimpleName());
- return new SchemaFetchMergeOperator(node.getPlanNodeId(),
operatorContext, children);
+ return new SchemaFetchMergeOperator(operatorContext, children);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
index 2552c4540f..8d1b999ef9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchMergeNode.java
@@ -61,6 +61,6 @@ public class SchemaFetchMergeNode extends
AbstractSchemaMergeNode {
}
public String toString() {
- return String.format("SchemaFetchNode-%s", getPlanNodeId());
+ return String.format("SchemaFetchMergeNode-%s", getPlanNodeId());
}
}