This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-4006 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 102b86b7d0488eeb63231cff90caeb5b59e490bf Author: JackieTien97 <[email protected]> AuthorDate: Mon Aug 1 15:18:56 2022 +0800 Improve the performance of Raw Query Without ValueFilter for nonAligned --- .../operator/process/DeviceMergeOperator.java | 2 +- .../RowBasedTimeJoinOperator.java} | 62 ++++++++++++---------- .../process/{ => join}/TimeJoinOperator.java | 7 +-- .../{ => join}/merge/AscTimeComparator.java | 2 +- .../process/{ => join}/merge/ColumnMerger.java | 24 ++++++++- .../{ => join}/merge/DescTimeComparator.java | 2 +- .../{ => join}/merge/MultiColumnMerger.java | 56 ++++++++++++++++++- .../merge/NonOverlappedMultiColumnMerger.java | 25 +++++++-- .../{ => join}/merge/SingleColumnMerger.java | 44 ++++++++++++++- .../process/{ => join}/merge/TimeComparator.java | 2 +- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 19 +++---- .../iotdb/db/mpp/execution/DataDriverTest.java | 6 +-- .../operator/AlignedSeriesScanOperatorTest.java | 8 +-- .../operator/DeviceMergeOperatorTest.java | 2 +- .../mpp/execution/operator/LimitOperatorTest.java | 6 +-- .../execution/operator/MultiColumnMergerTest.java | 2 +- .../NonOverlappedMultiColumnMergerTest.java | 4 +- .../mpp/execution/operator/OffsetOperatorTest.java | 6 +-- .../operator/RawDataAggregationOperatorTest.java | 6 +-- .../execution/operator/SingleColumnMergerTest.java | 6 +-- .../execution/operator/TimeJoinOperatorTest.java | 8 +-- 21 files changed, 220 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java index d16d47aa28..e0ccbe86ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java similarity index 85% copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java index 6775c66447..f026ef35c4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process; +package org.apache.iotdb.db.mpp.execution.operator.process.join; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -37,7 +38,7 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.successfulAsList; -public class TimeJoinOperator implements ProcessOperator { +public class RowBasedTimeJoinOperator implements ProcessOperator { private final OperatorContext operatorContext; @@ -79,7 +80,7 @@ public class TimeJoinOperator implements ProcessOperator { private final TimeComparator comparator; - public TimeJoinOperator( + public RowBasedTimeJoinOperator( OperatorContext operatorContext, List<Operator> children, Ordering mergeOrder, @@ -139,10 +140,7 @@ public class TimeJoinOperator implements ProcessOperator { inputIndex[i] = 0; inputTsBlocks[i] = children.get(i).next(); if (!empty(i)) { - int rowSize = inputTsBlocks[i].getPositionCount(); - for (int row = 0; row < rowSize; row++) { - timeSelector.add(inputTsBlocks[i].getTimeByIndex(row)); - } + updateTimeSelector(i); } else { // child operator has next but return an empty TsBlock which means that it may not // finish calculation in given time slice. @@ -175,26 +173,30 @@ public class TimeJoinOperator implements ProcessOperator { } TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); - while (!timeSelector.isEmpty() - && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) { - timeBuilder.writeLong(timeSelector.pollFirst()); - tsBlockBuilder.declarePosition(); - } - - for (int i = 0; i < outputColumnCount; i++) { - ColumnMerger merger = mergers.get(i); - merger.mergeColumn( - inputTsBlocks, - inputIndex, - shadowInputIndex, - timeBuilder, - currentEndTime, - tsBlockBuilder.getColumnBuilder(i)); - } - - // update inputIndex using shadowInputIndex - System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount); + long currentTime; + do { + currentTime = timeSelector.pollFirst(); + timeBuilder.writeLong(currentTime); + for (int i = 0; i < outputColumnCount; i++) { + ColumnMerger merger = mergers.get(i); + merger.mergeColumn( + inputTsBlocks, + inputIndex, + shadowInputIndex, + currentTime, + tsBlockBuilder.getColumnBuilder(i)); + } + for (int i = 0; i < inputOperatorsCount; i++) { + if (inputIndex[i] != shadowInputIndex[i]) { + inputIndex[i] = shadowInputIndex[i]; + if (!empty(i)) { + updateTimeSelector(i); + } + } + } + tsBlockBuilder.declarePosition(); + } while (currentTime < currentEndTime && !timeSelector.isEmpty()); return tsBlockBuilder.build(); } @@ -242,6 +244,10 @@ public class TimeJoinOperator implements ProcessOperator { return finished; } + private void updateTimeSelector(int index) { + timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index])); + } + /** * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else * return false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java similarity index 96% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java index 6775c66447..eb185e09ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process; +package org.apache.iotdb.db.mpp.execution.operator.process.join; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java index 95b7316844..96d9f86f59 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; public class AscTimeComparator implements TimeComparator { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java similarity index 70% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java index 8df6c53c55..7a00f2adf3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; @@ -38,7 +38,8 @@ public interface ColumnMerger { } /** - * merge columns belonging to same series into one column + * merge columns belonging to same series into one column, merge until each input column's time is + * larger than currentEndTime * * @param inputTsBlocks all source TsBlocks, some of which will contain source column * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks, @@ -57,4 +58,23 @@ public interface ColumnMerger { TimeColumnBuilder timeBuilder, long currentEndTime, ColumnBuilder columnBuilder); + + /** + * merge columns belonging to same series into one column, merge just one row whose time is equal + * to currentTime + * + * @param inputTsBlocks all source TsBlocks, some of which will contain source column + * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks, + * we should only read from this array and not update it because others will use the start + * index value in inputIndex array + * @param updatedInputIndex current index for each source TsBlock after merging + * @param currentTime merge just one row whose time is equal to currentTime + * @param columnBuilder used to write merged value into + */ + void mergeColumn( + TsBlock[] inputTsBlocks, + int[] inputIndex, + int[] updatedInputIndex, + long currentTime, + ColumnBuilder columnBuilder); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java index f53c97d8fe..67c0112dab 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; public class DescTimeComparator implements TimeComparator { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java similarity index 63% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java index ef9f7ceb00..79a1367ca7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -93,4 +93,58 @@ public class MultiColumnMerger implements ColumnMerger { } } } + + @Override + public void mergeColumn( + TsBlock[] inputTsBlocks, + int[] inputIndex, + int[] updatedInputIndex, + long currentTime, + ColumnBuilder columnBuilder) { + + // init startIndex for each input locations + for (InputLocation inputLocation : inputLocations) { + int tsBlockIndex = inputLocation.getTsBlockIndex(); + updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex]; + } + + // record whether current row already has value to be appended + boolean appendValue = false; + // we don't use MinHeap here to choose the right column, because inputLocations.size() won't + // be very large. + // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better + // than PriorityQueue. + for (InputLocation location : inputLocations) { + int tsBlockIndex = location.getTsBlockIndex(); + int columnIndex = location.getValueColumnIndex(); + int index = updatedInputIndex[tsBlockIndex]; + + // current location's input column is not empty + if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) { + TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn(); + Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex); + // time of current location's input column is equal to current row's time + if (timeColumn.getLong(index) == currentTime) { + // value of current location's input column is not null + if (!valueColumn.isNull(index)) { + columnBuilder.write(valueColumn, index); + appendValue = true; + } + // increase the index + index++; + // update the index after merging + updatedInputIndex[tsBlockIndex] = index; + // we can safely set appendValue to true and then break the loop, because these input + // columns' time is not overlapped + if (appendValue) { + break; + } + } + } + } + // all input columns are null at current row, so just append a null + if (!appendValue) { + columnBuilder.appendNull(); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java similarity index 82% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java index a22c31faf5..c338130b8a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import java.util.List; -import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn; - /** has more than one input column, but these columns' time is not overlapped */ public class NonOverlappedMultiColumnMerger implements ColumnMerger { @@ -62,7 +60,7 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger { // move to next InputLocation if current InputLocation's column has been consumed up moveToNextIfNecessary(inputTsBlocks); // merge current column - mergeOneColumn( + SingleColumnMerger.mergeOneColumn( inputTsBlocks, inputIndex, updatedInputIndex, @@ -73,6 +71,25 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger { comparator); } + @Override + public void mergeColumn( + TsBlock[] inputTsBlocks, + int[] inputIndex, + int[] updatedInputIndex, + long currentTime, + ColumnBuilder columnBuilder) { + // move to next InputLocation if current InputLocation's column has been consumed up + moveToNextIfNecessary(inputTsBlocks); + // merge current column + SingleColumnMerger.mergeOneColumn( + inputTsBlocks, + inputIndex, + updatedInputIndex, + currentTime, + columnBuilder, + inputLocations.get(index)); + } + private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) { // if it is already at the last index, don't need to move if (index == inputLocations.size() - 1) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java similarity index 74% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java index f621f59d99..0c9f7e60e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -110,4 +110,46 @@ public class SingleColumnMerger implements ColumnMerger { // update the index after merging updatedInputIndex[tsBlockIndex] = index; } + + @Override + public void mergeColumn( + TsBlock[] inputTsBlocks, + int[] inputIndex, + int[] updatedInputIndex, + long currentTime, + ColumnBuilder columnBuilder) { + mergeOneColumn( + inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location); + } + + public static void mergeOneColumn( + TsBlock[] inputTsBlocks, + int[] inputIndex, + int[] updatedInputIndex, + long currentTime, + ColumnBuilder columnBuilder, + InputLocation location) { + int tsBlockIndex = location.getTsBlockIndex(); + int columnIndex = location.getValueColumnIndex(); + + int index = inputIndex[tsBlockIndex]; + // input column is empty or current time of input column is already larger than currentEndTime + // just appendNull + if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex) + || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) { + columnBuilder.appendNull(); + } else { + // read from input column and write it into columnBuilder + Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex); + + if (valueColumn.isNull(index)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(valueColumn, index); + } + index++; + } + // update the index after merging + updatedInputIndex[tsBlockIndex] = index; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java similarity index 94% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java index db017f6186..47eff4fe08 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process.merge; +package org.apache.iotdb.db.mpp.execution.operator.process.join.merge; public interface TimeComparator { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 099d3d5594..a981c9e631 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -46,7 +46,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; @@ -68,19 +67,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePr import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill; +import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator; @@ -1146,7 +1147,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); - return new TimeJoinOperator( + return new RowBasedTimeJoinOperator( operatorContext, children, node.getMergeOrder(), diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java index dc13943bef..c71db0a64d 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java @@ -38,9 +38,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java index b45a11f668..e1a6bc5eb6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java @@ -30,10 +30,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java index 9aa09f18a5..18c00627e9 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java @@ -32,7 +32,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java index 6625f296e6..32701ac2b0 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java @@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java index 16e03c71e7..4668fb67f6 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java @@ -18,7 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java index b84f779060..76216233a1 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java @@ -18,8 +18,8 @@ */ package org.apache.iotdb.db.mpp.execution.operator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java index 9256c852c9..e05cad8041 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java @@ -31,9 +31,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java index 9b9b81a856..6a9902f24b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java @@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java index 3b12082c6e..f2043c281a 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java @@ -18,9 +18,9 @@ */ package org.apache.iotdb.db.mpp.execution.operator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java index 6030229577..aabfdb7eb2 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java @@ -29,10 +29,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; -import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; -import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
