This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch deviceMergeOperator1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e055c2f1b16eda84a525c377ecdf3701b3e41da Author: Alima777 <[email protected]> AuthorDate: Wed May 4 15:13:18 2022 +0800 add new method in localExecutionPlanner --- .../operator/process/DeviceMergeOperator.java | 223 ++++++++++++++++++++- .../operator/process/TimeJoinOperator.java | 5 +- .../operator/process/merge/AscTimeComparator.java | 4 +- .../operator/process/merge/DescTimeComparator.java | 4 +- .../operator/process/merge/SingleColumnMerger.java | 5 +- .../operator/process/merge/TimeComparator.java | 4 +- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 44 +++- .../iotdb/tsfile/read/common/block/TsBlock.java | 9 + 8 files changed, 280 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java index 91fea13a9b..4791b379f9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java @@ -21,10 +21,19 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; +import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; /** @@ -43,15 +52,39 @@ public class DeviceMergeOperator implements ProcessOperator { // The size devices and deviceOperators should be the same. private final List<String> devices; private final List<Operator> deviceOperators; + private TSDataType[] dataTypes; + private TsBlockBuilder tsBlockBuilder; + private final int inputOperatorsCount; private final TsBlock[] inputTsBlocks; + // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2 + private final String[] deviceOfInputTsBlocks; private final boolean[] noMoreTsBlocks; + private int curDeviceIndex; + // the index of curDevice in inputTsBlocks + private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>(); + + private boolean finished; + + private final TimeSelector timeSelector; + private final TimeComparator comparator; + public DeviceMergeOperator( - OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) { + OperatorContext operatorContext, + List<String> devices, + List<Operator> deviceOperators, + TimeSelector selector, + TimeComparator comparator) { this.operatorContext = operatorContext; this.devices = devices; this.deviceOperators = deviceOperators; + this.inputOperatorsCount = deviceOperators.size(); + this.inputTsBlocks = new TsBlock[inputOperatorsCount]; + this.deviceOfInputTsBlocks = new String[inputOperatorsCount]; + this.noMoreTsBlocks = new boolean[inputOperatorsCount]; + this.timeSelector = selector; + this.comparator = comparator; } @Override @@ -61,9 +94,9 @@ public class DeviceMergeOperator implements ProcessOperator { @Override public ListenableFuture<Void> isBlocked() { - for (int i = 0; i < inputCount; i++) { - if (!noMoreTsBlocks[i] && empty(i)) { - ListenableFuture<Void> blocked = children.get(i).isBlocked(); + for (int i = 0; i < inputOperatorsCount; i++) { + if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) { + ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked(); if (!blocked.isDone()) { return blocked; } @@ -74,21 +107,197 @@ public class DeviceMergeOperator implements ProcessOperator { @Override public TsBlock next() { - return null; + // get new input TsBlock + for (int i = 0; i < inputOperatorsCount; i++) { + if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) { + inputTsBlocks[i] = deviceOperators.get(i).next(); + deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]); + tryToAddCurDeviceTsBlockList(i); + } + } + // move to next device + while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) { + getNextDeviceTsBlocks(); + } + // process the curDeviceTsBlocks + if (curDeviceTsBlockIndexList.size() == 1) { + TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)]; + inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null; + curDeviceTsBlockIndexList.clear(); + return resultTsBlock; + } else { + if (tsBlockBuilder == null) { + initTsBlockBuilderFromTsBlock(inputTsBlocks[curDeviceTsBlockIndexList.get(0)]); + } else { + tsBlockBuilder.reset(); + } + int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size(); + TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice]; + TsBlockSingleColumnIterator[] tsBlockIterators = + new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice]; + for (int i = 0; i < tsBlockSizeOfCurDevice; i++) { + deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)]; + tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator(); + } + // Use the min end time of all tsBlocks as the end time of result tsBlock + // i.e. only one tsBlock will be consumed totally + long currentEndTime = deviceTsBlocks[0].getEndTime(); + for (int i = 1; i < tsBlockSizeOfCurDevice; i++) { + currentEndTime = + comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime()); + } + + TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); + ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders(); + while (!timeSelector.isEmpty() + && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) { + long timestamp = timeSelector.pollFirst(); + timeBuilder.writeLong(timestamp); + // TODO process by column + // Try to find the tsBlock that timestamp belongs to + for (int i = 0; i < tsBlockSizeOfCurDevice; i++) { + if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) { + int rowIndex = tsBlockIterators[i].getRowIndex(); + for (int j = 0; j < valueColumnBuilders.length; j++) { + // the jth column of rowIndex of ith tsBlock + if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) { + valueColumnBuilders[j].appendNull(); + continue; + } + switch (dataTypes[j]) { + case BOOLEAN: + valueColumnBuilders[j].writeBoolean( + deviceTsBlocks[i].getColumn(j).getBoolean(rowIndex)); + break; + case INT32: + valueColumnBuilders[j].writeInt(deviceTsBlocks[i].getColumn(j).getInt(rowIndex)); + break; + case INT64: + valueColumnBuilders[j].writeLong( + deviceTsBlocks[i].getColumn(j).getLong(rowIndex)); + break; + case FLOAT: + valueColumnBuilders[j].writeFloat( + deviceTsBlocks[i].getColumn(j).getFloat(rowIndex)); + break; + case DOUBLE: + valueColumnBuilders[j].writeDouble( + deviceTsBlocks[i].getColumn(j).getDouble(rowIndex)); + break; + case TEXT: + valueColumnBuilders[j].writeBinary( + deviceTsBlocks[i].getColumn(j).getBinary(rowIndex)); + break; + } + } + tsBlockIterators[i].next(); + break; + } + } + tsBlockBuilder.declarePosition(); + } + // update tsBlock after consuming + for (int i = 0; i < tsBlockSizeOfCurDevice; i++) { + if (tsBlockIterators[i].hasNext()) { + int rowIndex = tsBlockIterators[i].getRowIndex(); + inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = + inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex); + } else { + inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null; + curDeviceTsBlockIndexList.remove(i); + } + } + return tsBlockBuilder.build(); + } } @Override public boolean hasNext() { + if (finished) { + return false; + } + for (int i = 0; i < inputOperatorsCount; i++) { + if (!isTsBlockEmpty(i)) { + return true; + } else if (!noMoreTsBlocks[i]) { + if (deviceOperators.get(i).hasNext()) { + return true; + } else { + noMoreTsBlocks[i] = true; + inputTsBlocks[i] = null; + } + } + } return false; } @Override public void close() throws Exception { - ProcessOperator.super.close(); + for (Operator deviceOperator : deviceOperators) { + deviceOperator.close(); + } } @Override public boolean isFinished() { - return false; + if (finished) { + return true; + } + finished = true; + + for (int i = 0; i < inputOperatorsCount; i++) { + // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i] + if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) { + finished = false; + break; + } + } + return finished; + } + + private void initTsBlockBuilderFromTsBlock(TsBlock tsBlock) { + dataTypes = tsBlock.getValueDataTypes(); + tsBlockBuilder = new TsBlockBuilder(Arrays.asList(dataTypes)); + } + + /** DeviceColumn must be the first value column of tsBlock transferred by DeviceViewOperator. */ + private String getDeviceNameFromTsBlock(TsBlock tsBlock) { + if (tsBlock.getColumn(0).isNull(0)) { + return null; + } + return tsBlock.getColumn(0).getBinary(0).toString(); + } + + private String getCurDeviceName() { + return devices.get(curDeviceIndex); + } + + private void getNextDeviceTsBlocks() { + curDeviceIndex++; + for (int i = 0; i < inputOperatorsCount; i++) { + tryToAddCurDeviceTsBlockList(i); + } + } + + private void tryToAddCurDeviceTsBlockList(int tsBlockIndex) { + if (deviceOfInputTsBlocks[tsBlockIndex] != null + && deviceOfInputTsBlocks[tsBlockIndex].equals(getCurDeviceName())) { + // add tsBlock of curDevice to a list + curDeviceTsBlockIndexList.add(tsBlockIndex); + // add all timestamp of curDevice to timeSelector + int rowSize = inputTsBlocks[tsBlockIndex].getPositionCount(); + for (int row = 0; row < rowSize; row++) { + timeSelector.add(inputTsBlocks[tsBlockIndex].getTimeByIndex(row)); + } + } + } + + /** + * If the tsBlock of tsBlockIndex is null or has no more data in the tsBlock, return true; else + * return false; + */ + private boolean isTsBlockEmpty(int tsBlockIndex) { + return inputTsBlocks[tsBlockIndex] == null + || inputTsBlocks[tsBlockIndex].getPositionCount() == 0; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java index 0fe2bb2873..1ca4053049 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java @@ -143,7 +143,7 @@ public class TimeJoinOperator implements ProcessOperator { if (!empty(i)) { currentEndTime = init - ? comparator.getSatisfiedTime(currentEndTime, inputTsBlocks[i].getEndTime()) + ? comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime()) : inputTsBlocks[i].getEndTime(); init = true; } @@ -156,7 +156,8 @@ public class TimeJoinOperator implements ProcessOperator { } TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); - while (!timeSelector.isEmpty() && comparator.satisfy(timeSelector.first(), currentEndTime)) { + while (!timeSelector.isEmpty() + && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) { timeBuilder.writeLong(timeSelector.pollFirst()); tsBlockBuilder.declarePosition(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java index 932262aab7..95b7316844 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java @@ -22,12 +22,12 @@ public class AscTimeComparator implements TimeComparator { /** @return if order by time asc, return true if time <= endTime, otherwise false */ @Override - public boolean satisfy(long time, long endTime) { + public boolean satisfyCurEndTime(long time, long endTime) { return time <= endTime; } @Override - public long getSatisfiedTime(long time1, long time2) { + public long getCurrentEndTime(long time1, long time2) { return Math.min(time1, time2); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java index 85e0e7b572..f53c97d8fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java @@ -22,12 +22,12 @@ public class DescTimeComparator implements TimeComparator { /** @return if order by time desc, return true if time >= endTime, otherwise false */ @Override - public boolean satisfy(long time, long endTime) { + public boolean satisfyCurEndTime(long time, long endTime) { return time >= endTime; } @Override - public long getSatisfiedTime(long time1, long time2) { + public long getCurrentEndTime(long time1, long time2) { return Math.max(time1, time2); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java index d7279ed875..92cc2f3a94 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java @@ -53,7 +53,8 @@ public class SingleColumnMerger implements ColumnMerger { // input column is empty or current time of input column is already larger than currentEndTime // just appendNull rowCount null if (empty(tsBlockIndex, inputTsBlocks, inputIndex) - || !comparator.satisfy(inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) { + || !comparator.satisfyCurEndTime( + inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) { columnBuilder.appendNull(rowCount); } else { // read from input column and write it into columnBuilder @@ -63,7 +64,7 @@ public class SingleColumnMerger implements ColumnMerger { // current index reaches the size of input column or current time of input column is already // larger than currentEndTime, use null column to fill the remaining if (timeColumn.getPositionCount() == index - || !comparator.satisfy( + || !comparator.satisfyCurEndTime( inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) { columnBuilder.appendNull(rowCount - i); break; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java index 8587df43dc..db017f6186 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java @@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process.merge; public interface TimeComparator { /** @return true if time is satisfied with endTime, otherwise false */ - boolean satisfy(long time, long endTime); + boolean satisfyCurEndTime(long time, long endTime); /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */ - long getSatisfiedTime(long time1, long time2); + long getCurrentEndTime(long time1, long time2); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 9f127c883c..20191ee949 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -33,6 +33,8 @@ import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; @@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SeriesSchema import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode; @@ -82,6 +85,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn; import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; +import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -335,7 +339,45 @@ public class LocalExecutionPlanner { @Override public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) { - return super.visitDeviceView(node, context); + OperatorContext operatorContext = + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + DeviceViewNode.class.getSimpleName()); + List<Operator> children = + node.getChildren().stream() + .map(child -> child.accept(this, context)) + .collect(Collectors.toList()); + return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null); + } + + @Override + public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + DeviceViewNode.class.getSimpleName()); + List<Operator> children = + node.getChildren().stream() + .map(child -> child.accept(this, context)) + .collect(Collectors.toList()); + TimeSelector selector = null; + TimeComparator timeComparator = null; + for (OrderBy orderBy : node.getMergeOrders()) { + switch (orderBy) { + case TIMESTAMP_ASC: + selector = new TimeSelector(node.getChildren().size() << 1, true); + timeComparator = ASC_TIME_COMPARATOR; + break; + case TIMESTAMP_DESC: + selector = new TimeSelector(node.getChildren().size() << 1, false); + timeComparator = DESC_TIME_COMPARATOR; + break; + } + } + return new DeviceMergeOperator( + operatorContext, node.getDevices(), children, selector, timeComparator); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java index 8034bfb189..dedd92a19c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.tsfile.read.common.block; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; import org.apache.iotdb.tsfile.read.common.block.column.Column; @@ -213,6 +214,14 @@ public class TsBlock { return columns; } + public TSDataType[] getValueDataTypes() { + TSDataType[] dataTypes = new TSDataType[valueColumns.length]; + for (int i = 0; i < valueColumns.length; i++) { + dataTypes[i] = valueColumns[i].getDataType(); + } + return dataTypes; + } + public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() { return new TsBlockSingleColumnIterator(0); }
