This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch deviceMergeOperator1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 087f4204985aaac1f661ad2f52ff912a2ca05d2d Author: Alima777 <[email protected]> AuthorDate: Tue May 3 20:14:12 2022 +0800 Add deviceMergeOperator --- .../operator/process/DeviceMergeOperator.java | 50 ++++++++++++++++++++++ .../operator/process/DeviceViewOperator.java | 8 ++-- .../operator/process/merge/ColumnMerger.java | 2 +- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java new file mode 100644 index 0000000000..4122015cb5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator.process; + +import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; + +import java.util.List; + +/** + * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators. + * + * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node + * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged + * by time within the device. + * + * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by + * DeviceViewOperator. + */ +public class DeviceMergeOperator { + + private final OperatorContext operatorContext; + // The size devices and deviceOperators should be the same. + private final List<String> devices; + private final List<Operator> deviceOperators; + + public DeviceMergeOperator( + OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) { + this.operatorContext = operatorContext; + this.devices = devices; + this.deviceOperators = deviceOperators; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java index a90dcfd0bb..d3165042a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java @@ -34,10 +34,10 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.List; /** - * Since devices have been sorted by the merge order as expected, what DeviceMergeOperator need to - * do is traversing the device child operators, get all tsBlocks of one device and transform it to - * the form we need, adding the device column and allocating value column to its expected location, - * then get the next device operator until no next device. + * Since devices have been sorted by the merge order as expected, what DeviceViewOperator need to do + * is traversing the device child operators, get all tsBlocks of one device and transform it to the + * form we need, adding the device column and allocating value column to its expected location, then + * get the next device operator until no next device. * * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that have not transformed * the result form. diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java index 4d301c8e8a..731827aea5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java @@ -40,7 +40,7 @@ public interface ColumnMerger { /** * merge columns belonging to same series into one column * - * @param inputTsBlocks all source TsBlocks, some of which will cantain source column + * @param inputTsBlocks all source TsBlocks, some of which will contain source column * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks, * we should only read from this array and not update it because others will use the start * index value in inputIndex array
