This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch deviceMergeOperator1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 537dfc32ce3ff27e72dc9c17ffdbad6e22caf5b5 Author: Alima777 <[email protected]> AuthorDate: Tue May 3 20:23:20 2022 +0800 part implementation --- .../operator/process/DeviceMergeOperator.java | 46 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java index 4122015cb5..91fea13a9b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; + +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; @@ -34,17 +37,58 @@ import java.util.List; * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by * DeviceViewOperator. */ -public class DeviceMergeOperator { +public class DeviceMergeOperator implements ProcessOperator { private final OperatorContext operatorContext; // The size devices and deviceOperators should be the same. private final List<String> devices; private final List<Operator> deviceOperators; + private final TsBlock[] inputTsBlocks; + private final boolean[] noMoreTsBlocks; + public DeviceMergeOperator( OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) { this.operatorContext = operatorContext; this.devices = devices; this.deviceOperators = deviceOperators; } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<Void> isBlocked() { + for (int i = 0; i < inputCount; i++) { + if (!noMoreTsBlocks[i] && empty(i)) { + ListenableFuture<Void> blocked = children.get(i).isBlocked(); + if (!blocked.isDone()) { + return blocked; + } + } + } + return NOT_BLOCKED; + } + + @Override + public TsBlock next() { + return null; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public void close() throws Exception { + ProcessOperator.super.close(); + } + + @Override + public boolean isFinished() { + return false; + } }
