This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch stable-mpp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 043f5f6912380fc5227355623e4a20403f5e8f9f Author: JackieTien97 <[email protected]> AuthorDate: Fri Apr 29 11:23:55 2022 +0800 Support order by time desc --- .../db/mpp/operator/process/TimeJoinOperator.java | 17 ++- .../operator/process/merge/AscTimeComparator.java | 30 ++--- .../operator/process/merge/DescTimeComparator.java | 30 ++--- .../operator/process/merge/SingleColumnMerger.java | 37 +----- .../mpp/operator/process/merge/TimeComparator.java | 27 +---- .../db/mpp/operator/source/SeriesScanUtil.java | 6 +- .../db/mpp/sql/planner/LocalExecutionPlanner.java | 25 +++- .../query/reader/chunk/MemAlignedPageReader.java | 2 +- .../iotdb/db/query/reader/chunk/MemPageReader.java | 4 +- .../iotdb/db/mpp/execution/DataDriverTest.java | 6 +- .../iotdb/db/mpp/operator/LimitOperatorTest.java | 6 +- .../db/mpp/operator/SingleColumnMergerTest.java | 11 +- .../db/mpp/operator/TimeJoinOperatorTest.java | 132 ++++++++++++++++++++- .../iotdb/tsfile/read/common/block/TsBlock.java | 7 ++ .../read/common/block/column/BinaryColumn.java | 16 +++ .../read/common/block/column/BooleanColumn.java | 16 +++ .../tsfile/read/common/block/column/Column.java | 3 + .../read/common/block/column/DoubleColumn.java | 16 +++ .../read/common/block/column/FloatColumn.java | 16 +++ .../tsfile/read/common/block/column/IntColumn.java | 16 +++ .../read/common/block/column/LongColumn.java | 16 +++ .../block/column/RunLengthEncodedColumn.java | 5 + .../read/common/block/column/TimeColumn.java | 9 ++ .../iotdb/tsfile/read/reader/IPageReader.java | 2 +- .../tsfile/read/reader/page/AlignedPageReader.java | 2 +- .../iotdb/tsfile/read/reader/page/PageReader.java | 3 +- 26 files changed, 331 insertions(+), 129 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java index 0c0fbc96e1..b3c08c26c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.operator.process; import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.mpp.operator.process.merge.ColumnMerger; +import org.apache.iotdb.db.mpp.operator.process.merge.TimeComparator; import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -68,12 +69,15 @@ public class TimeJoinOperator implements ProcessOperator { private boolean finished; + private final TimeComparator comparator; + public TimeJoinOperator( OperatorContext operatorContext, List<Operator> children, OrderBy mergeOrder, List<TSDataType> dataTypes, - List<ColumnMerger> mergers) { + List<ColumnMerger> mergers, + TimeComparator comparator) { checkArgument( children != null && children.size() > 0, "child size of TimeJoinOperator should be larger than 0"); @@ -89,6 +93,7 @@ public class TimeJoinOperator implements ProcessOperator { this.dataTypes = dataTypes; this.tsBlockBuilder = new TsBlockBuilder(dataTypes); this.mergers = mergers; + this.comparator = comparator; } @Override @@ -112,8 +117,8 @@ public class TimeJoinOperator implements ProcessOperator { @Override public TsBlock next() { tsBlockBuilder.reset(); - // end time for returned TsBlock this time, it's the min end time among all the children - // TsBlocks + // end time for returned TsBlock this time, it's the min/max end time among all the children + // TsBlocks order by asc/desc long currentEndTime = 0; boolean init = false; for (int i = 0; i < inputCount; i++) { @@ -131,7 +136,7 @@ public class TimeJoinOperator implements ProcessOperator { if (!empty(i)) { currentEndTime = init - ? Math.min(currentEndTime, inputTsBlocks[i].getEndTime()) + ? comparator.getSatisfiedTime(currentEndTime, inputTsBlocks[i].getEndTime()) : inputTsBlocks[i].getEndTime(); init = true; } @@ -144,7 +149,7 @@ public class TimeJoinOperator implements ProcessOperator { } TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); - while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) { + while (!timeSelector.isEmpty() && comparator.satisfy(timeSelector.first(), currentEndTime)) { timeBuilder.writeLong(timeSelector.pollFirst()); tsBlockBuilder.declarePosition(); } @@ -199,7 +204,7 @@ public class TimeJoinOperator implements ProcessOperator { return true; } finished = true; - for (int i = 0; i < columnCount; i++) { + for (int i = 0; i < inputCount; i++) { // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i] if (!noMoreTsBlocks[i] || !empty(i)) { finished = false; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/AscTimeComparator.java similarity index 54% copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/AscTimeComparator.java index 3affee8045..6456ed2bd7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/AscTimeComparator.java @@ -16,28 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.tsfile.read.reader; +package org.apache.iotdb.db.mpp.operator.process.merge; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; +public class AscTimeComparator implements TimeComparator { -import java.io.IOException; - -public interface IPageReader { - - default BatchData getAllSatisfiedPageData() throws IOException { - return getAllSatisfiedPageData(true); + /** @return if order by time asc, return true if time <= endTime, otherwise false */ + @Override + public boolean satisfy(long time, long endTime) { + return time <= endTime; } - BatchData getAllSatisfiedPageData(boolean ascending) throws IOException; - - TsBlock getAllSatisfiedData(boolean ascending) throws IOException; - - Statistics getStatistics(); - - void setFilter(Filter filter); - - boolean isModified(); + @Override + public long getSatisfiedTime(long time1, long time2) { + return Math.min(time1, time2); + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/DescTimeComparator.java similarity index 54% copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/DescTimeComparator.java index 3affee8045..006754fa70 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/DescTimeComparator.java @@ -16,28 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.tsfile.read.reader; +package org.apache.iotdb.db.mpp.operator.process.merge; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; +public class DescTimeComparator implements TimeComparator { -import java.io.IOException; - -public interface IPageReader { - - default BatchData getAllSatisfiedPageData() throws IOException { - return getAllSatisfiedPageData(true); + /** @return if order by time desc, return true if time >= endTime, otherwise false */ + @Override + public boolean satisfy(long time, long endTime) { + return time >= endTime; } - BatchData getAllSatisfiedPageData(boolean ascending) throws IOException; - - TsBlock getAllSatisfiedData(boolean ascending) throws IOException; - - Statistics getStatistics(); - - void setFilter(Filter filter); - - boolean isModified(); + @Override + public long getSatisfiedTime(long time1, long time2) { + return Math.max(time1, time2); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java index 83a2e69733..5491a90bc1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.mpp.operator.process.merge; import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation; -import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; @@ -29,21 +28,13 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; /** only has one input column */ public class SingleColumnMerger implements ColumnMerger { - private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator(); - - private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator(); - private final InputLocation location; private final TimeComparator comparator; - public SingleColumnMerger(InputLocation location, OrderBy orderBy) { + public SingleColumnMerger(InputLocation location, TimeComparator comparator) { this.location = location; - if (orderBy == OrderBy.TIMESTAMP_ASC) { - comparator = ASC_TIME_COMPARATOR; - } else { - comparator = DESC_TIME_COMPARATOR; - } + this.comparator = comparator; } @Override @@ -97,28 +88,4 @@ public class SingleColumnMerger implements ColumnMerger { updatedInputIndex[tsBlockIndex] = index; } } - - private interface TimeComparator { - - /** @return true if time is satisfied with endTime, otherwise false */ - boolean satisfy(long time, long endTime); - } - - private static class AscTimeComparator implements TimeComparator { - - /** @return if order by time asc, return true if time <= endTime, otherwise false */ - @Override - public boolean satisfy(long time, long endTime) { - return time <= endTime; - } - } - - private static class DescTimeComparator implements TimeComparator { - - /** @return if order by time desc, return true if time >= endTime, otherwise false */ - @Override - public boolean satisfy(long time, long endTime) { - return time >= endTime; - } - } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/TimeComparator.java similarity index 54% copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/TimeComparator.java index 3affee8045..ae910960ce 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/TimeComparator.java @@ -16,28 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.tsfile.read.reader; +package org.apache.iotdb.db.mpp.operator.process.merge; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; +public interface TimeComparator { -import java.io.IOException; + /** @return true if time is satisfied with endTime, otherwise false */ + boolean satisfy(long time, long endTime); -public interface IPageReader { - - default BatchData getAllSatisfiedPageData() throws IOException { - return getAllSatisfiedPageData(true); - } - - BatchData getAllSatisfiedPageData(boolean ascending) throws IOException; - - TsBlock getAllSatisfiedData(boolean ascending) throws IOException; - - Statistics getStatistics(); - - void setFilter(Filter filter); - - boolean isModified(); + /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */ + long getSatisfiedTime(long time1, long time2); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java index fad08b703d..c768873f05 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java @@ -1127,7 +1127,11 @@ public class SeriesScanUtil { } TsBlock getAllSatisfiedPageData(boolean ascending) throws IOException { - return data.getAllSatisfiedData(ascending); + TsBlock tsBlock = data.getAllSatisfiedData(); + if (!ascending) { + tsBlock.reverse(); + } + return tsBlock; } void setFilter(Filter filter) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java index ccc9332b8a..03d8f758ba 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java @@ -35,8 +35,11 @@ import org.apache.iotdb.db.mpp.operator.Operator; import org.apache.iotdb.db.mpp.operator.OperatorContext; import org.apache.iotdb.db.mpp.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.operator.process.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.ColumnMerger; +import org.apache.iotdb.db.mpp.operator.process.merge.DescTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger; +import org.apache.iotdb.db.mpp.operator.process.merge.TimeComparator; import org.apache.iotdb.db.mpp.operator.schema.CountMergeOperator; import org.apache.iotdb.db.mpp.operator.schema.DevicesCountOperator; import org.apache.iotdb.db.mpp.operator.schema.DevicesSchemaScanOperator; @@ -100,6 +103,10 @@ public class LocalExecutionPlanner { private static final DataBlockManager DATA_BLOCK_MANAGER = DataBlockService.getInstance().getDataBlockManager(); + private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator(); + + private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator(); + public static LocalExecutionPlanner getInstance() { return InstanceHolder.INSTANCE; } @@ -389,11 +396,20 @@ public class LocalExecutionPlanner { context.getNextOperatorId(), node.getPlanNodeId(), TimeJoinOperator.class.getSimpleName()); + TimeComparator timeComparator = + node.getMergeOrder() == OrderBy.TIMESTAMP_ASC + ? ASC_TIME_COMPARATOR + : DESC_TIME_COMPARATOR; List<OutputColumn> outputColumns = generateOutputColumns(node); - List<ColumnMerger> mergers = createColumnMergers(outputColumns); + List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator); List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); return new TimeJoinOperator( - operatorContext, children, node.getMergeOrder(), outputColumnTypes, mergers); + operatorContext, + children, + node.getMergeOrder(), + outputColumnTypes, + mergers, + timeComparator); } private List<OutputColumn> generateOutputColumns(TimeJoinNode node) { @@ -402,13 +418,14 @@ public class LocalExecutionPlanner { .collect(Collectors.toList()); } - private List<ColumnMerger> createColumnMergers(List<OutputColumn> outputColumns) { + private List<ColumnMerger> createColumnMergers( + List<OutputColumn> outputColumns, TimeComparator timeComparator) { List<ColumnMerger> mergers = new ArrayList<>(outputColumns.size()); for (OutputColumn outputColumn : outputColumns) { ColumnMerger merger; // only has one input column if (outputColumn.isSingleInputColumn()) { - merger = new SingleColumnMerger(outputColumn.getInputLocation(0), OrderBy.TIMESTAMP_ASC); + merger = new SingleColumnMerger(outputColumn.getInputLocation(0), timeComparator); } else if (!outputColumn.isOverlapped()) { // has more than one input columns but time of these input columns is not overlapped throw new UnsupportedOperationException( diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java index 7d679d1f98..f0f1134d3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java @@ -83,7 +83,7 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { } @Override - public TsBlock getAllSatisfiedData(boolean ascending) throws IOException { + public TsBlock getAllSatisfiedData() throws IOException { // TODO change from the row-based style to column-based style TsBlockBuilder builder = new TsBlockBuilder( diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java index f8e87d0a50..2032314ea0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java @@ -66,10 +66,8 @@ public class MemPageReader implements IPageReader { } @Override - public TsBlock getAllSatisfiedData(boolean ascending) throws IOException { + public TsBlock getAllSatisfiedData() throws IOException { TSDataType dataType = chunkMetadata.getDataType(); - // TODO we still need to consider data type, ascending and descending here - TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java index 5f03cd366a..60d75c5d7e 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.operator.process.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@ -145,8 +146,9 @@ public class DataDriverTest { OrderBy.TIMESTAMP_ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32), Arrays.asList( - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC), - new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC))); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), + new AscTimeComparator()); LimitOperator limitOperator = new LimitOperator( diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java index 6fc0e04647..d4ba8f816b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.operator.process.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@ -140,8 +141,9 @@ public class LimitOperatorTest { OrderBy.TIMESTAMP_ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32), Arrays.asList( - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC), - new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC))); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), + new AscTimeComparator()); LimitOperator limitOperator = new LimitOperator( diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java index 38a5a550f4..4912575921 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java @@ -18,9 +18,10 @@ */ package org.apache.iotdb.db.mpp.operator; +import org.apache.iotdb.db.mpp.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.operator.process.merge.DescTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation; -import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; @@ -41,7 +42,7 @@ public class SingleColumnMergerTest { @Test public void mergeTest1() { SingleColumnMerger merger = - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()); TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); inputBuilder.getTimeColumnBuilder().writeLong(2); @@ -93,7 +94,7 @@ public class SingleColumnMergerTest { @Test public void mergeTest2() { SingleColumnMerger merger = - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()); TsBlock[] inputTsBlocks = new TsBlock[1]; int[] inputIndex = new int[] {0}; @@ -129,7 +130,7 @@ public class SingleColumnMergerTest { @Test public void mergeTest3() { SingleColumnMerger merger = - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()); TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); inputBuilder.getTimeColumnBuilder().writeLong(8); @@ -170,7 +171,7 @@ public class SingleColumnMergerTest { @Test public void mergeTest4() { SingleColumnMerger merger = - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_DESC); + new SingleColumnMerger(new InputLocation(0, 0), new DescTimeComparator()); TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); inputBuilder.getTimeColumnBuilder().writeLong(2); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java index a0f4d08023..b8ad74442d 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.operator.process.merge.DescTimeComparator; import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; @@ -137,8 +139,9 @@ public class TimeJoinOperatorTest { OrderBy.TIMESTAMP_ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32), Arrays.asList( - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC), - new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC))); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), + new AscTimeComparator()); int count = 0; while (timeJoinOperator.hasNext()) { TsBlock tsBlock = timeJoinOperator.next(); @@ -251,9 +254,10 @@ public class TimeJoinOperatorTest { OrderBy.TIMESTAMP_ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32), Arrays.asList( - new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC), - new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC), - new SingleColumnMerger(new InputLocation(2, 0), OrderBy.TIMESTAMP_ASC))); + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(2, 0), new AscTimeComparator())), + new AscTimeComparator()); int count = 0; while (timeJoinOperator.hasNext()) { TsBlock tsBlock = timeJoinOperator.next(); @@ -289,4 +293,122 @@ public class TimeJoinOperatorTest { instanceNotificationExecutor.shutdown(); } } + + /** test time join with non-exist sensor and order by time desc */ + @Test + public void batchTest3() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath1 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32); + Set<String> allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + allSensors.add("error_sensor"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + fragmentInstanceContext.addOperatorContext( + 2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId3 = new PlanNodeId("3"); + fragmentInstanceContext.addOperatorContext( + 3, planNodeId3, SeriesScanOperator.class.getSimpleName()); + fragmentInstanceContext.addOperatorContext( + 4, new PlanNodeId("4"), TimeJoinOperator.class.getSimpleName()); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + planNodeId1, + measurementPath1, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + null, + false); + seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath2 = + new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + planNodeId2, + measurementPath2, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(1), + null, + null, + false); + seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + MeasurementPath measurementPath3 = + new MeasurementPath( + TIME_JOIN_OPERATOR_TEST_SG + ".device0.error_sensor", TSDataType.INT32); + SeriesScanOperator seriesScanOperator3 = + new SeriesScanOperator( + planNodeId3, + measurementPath3, + allSensors, + TSDataType.INT32, + fragmentInstanceContext.getOperatorContexts().get(2), + null, + null, + true); + seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); + + TimeJoinOperator timeJoinOperator = + new TimeJoinOperator( + fragmentInstanceContext.getOperatorContexts().get(3), + Arrays.asList(seriesScanOperator1, seriesScanOperator2, seriesScanOperator3), + OrderBy.TIMESTAMP_DESC, + Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32), + Arrays.asList( + new SingleColumnMerger(new InputLocation(0, 0), new DescTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new DescTimeComparator()), + new SingleColumnMerger(new InputLocation(2, 0), new DescTimeComparator())), + new DescTimeComparator()); + int count = 25; + while (timeJoinOperator.hasNext()) { + TsBlock tsBlock = timeJoinOperator.next(); + assertEquals(3, tsBlock.getValueColumnCount()); + assertTrue(tsBlock.getColumn(0) instanceof IntColumn); + assertTrue(tsBlock.getColumn(1) instanceof IntColumn); + assertTrue(tsBlock.getColumn(2) instanceof RunLengthEncodedColumn); + assertEquals(20, tsBlock.getPositionCount()); + for (int i = 0; i < tsBlock.getPositionCount(); i++) { + long expectedTime = tsBlock.getPositionCount() - i - 1 + 20L * (count - 1); + assertEquals(expectedTime, tsBlock.getTimeByIndex(i)); + assertTrue(tsBlock.getColumn(2).isNull(i)); + if (expectedTime < 200) { + assertEquals(20000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else if (expectedTime < 260 + || (expectedTime >= 300 && expectedTime < 380) + || expectedTime >= 400) { + assertEquals(10000 + expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i)); + } else { + assertEquals(expectedTime, tsBlock.getColumn(0).getInt(i)); + assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i)); + } + } + count--; + } + assertEquals(0, count); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java index f6d4b68c4d..a5657d74fe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java @@ -194,6 +194,13 @@ public class TsBlock { return new TsBlockSingleColumnIterator(0, columnIndex); } + public void reverse() { + timeColumn.reverse(); + for (Column valueColumn : valueColumns) { + valueColumn.reverse(); + } + } + public TsBlockRowIterator getTsBlockRowIterator() { return new TsBlockRowIterator(0); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java index 4d7a888394..f6b2c50e66 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java @@ -123,6 +123,22 @@ public class BinaryColumn implements Column { return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + Binary valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java index 218ce1baf8..23dd445a34 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java @@ -122,6 +122,22 @@ public class BooleanColumn implements Column { return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java index ef9fb7d637..c9ad6eddb3 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java @@ -102,4 +102,7 @@ public interface Column { * also be released. If the region column is released, this block may also be released. */ Column getRegion(int positionOffset, int length); + + /** reverse the column */ + void reverse(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java index 13faf135fb..a9e64e4224 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java @@ -122,6 +122,22 @@ public class DoubleColumn implements Column { return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + double valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java index 08762164fb..efa1243028 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java @@ -121,6 +121,22 @@ public class FloatColumn implements Column { return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + float valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java index 7e8d67f1b3..5ba777c137 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java @@ -121,6 +121,22 @@ public class IntColumn implements Column { return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + int valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java index a786918af8..4fd1fadbb2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java @@ -121,6 +121,22 @@ public class LongColumn implements Column { return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + long valueTmp = values[i]; + values[i] = values[j]; + values[j] = valueTmp; + } + if (valueIsNull != null) { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + boolean isNullTmp = valueIsNull[i]; + valueIsNull[i] = valueIsNull[j]; + valueIsNull[j] = isNullTmp; + } + } + } + private void checkReadablePosition(int position) { if (position < 0 || position >= getPositionCount()) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java index 283c374a99..d82d7cf464 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java @@ -147,6 +147,11 @@ public class RunLengthEncodedColumn implements Column { return new RunLengthEncodedColumn(value, length); } + @Override + public void reverse() { + // do nothing because the underlying column has only one value + } + private void checkReadablePosition(int position) { if (position < 0 || position >= positionCount) { throw new IllegalArgumentException("position is not valid"); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java index d8b44fd384..2f8176b699 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java @@ -104,6 +104,15 @@ public class TimeColumn implements Column { return new TimeColumn(positionOffset + arrayOffset, length, values); } + @Override + public void reverse() { + for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) { + long time = values[i]; + values[i] = values[j]; + values[j] = time; + } + } + public long getStartTime() { return values[arrayOffset]; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java index 3affee8045..3d7db3ec21 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java @@ -33,7 +33,7 @@ public interface IPageReader { BatchData getAllSatisfiedPageData(boolean ascending) throws IOException; - TsBlock getAllSatisfiedData(boolean ascending) throws IOException; + TsBlock getAllSatisfiedData() throws IOException; Statistics getStatistics(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index df8eaf8a18..5dc9a466a6 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -106,7 +106,7 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { } @Override - public TsBlock getAllSatisfiedData(boolean ascending) throws IOException { + public TsBlock getAllSatisfiedData() throws IOException { // TODO change from the row-based style to column-based style TsBlockBuilder builder = new TsBlockBuilder( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index f7ce9bd59a..b54278451a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -159,8 +159,7 @@ public class PageReader implements IPageReader { } @Override - public TsBlock getAllSatisfiedData(boolean ascending) throws IOException { - // TODO we still need to consider data type, ascending and descending here + public TsBlock getAllSatisfiedData() throws IOException { TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
