This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TimeJoin in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6207e102e8e33ba7b33369eac5b2ded02d5a29c7 Author: JackieTien97 <[email protected]> AuthorDate: Tue Dec 19 20:23:20 2023 +0800 Add LeftOuterTimeJoinOperator --- .../process/join/LeftOuterTimeJoinOperator.java | 324 +++++++++++++++++++++ .../process/join/RowBasedTimeJoinOperator.java | 2 +- .../process/join/merge/AscTimeComparator.java | 12 +- .../process/join/merge/DescTimeComparator.java | 12 +- .../process/join/merge/TimeComparator.java | 10 +- 5 files changed, 355 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java new file mode 100644 index 00000000000..651c47d3c2c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static com.google.common.util.concurrent.Futures.successfulAsList; + +public class LeftOuterTimeJoinOperator implements ProcessOperator { + + private final OperatorContext operatorContext; + + private final int outputColumnCount; + + private final TimeComparator comparator; + + private final TsBlockBuilder resultBuilder; + + private final Operator left; + private final int leftColumnCount; + + private TsBlock leftTsBlock; + + // start index of leftTsBlock + private int leftIndex; + + private final Operator right; + + private TsBlock rightTsBlock; + + // start index of rightTsBlock + private int rightIndex; + + private boolean rightFinished = false; + + private final long maxReturnSize = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + + public LeftOuterTimeJoinOperator( + OperatorContext operatorContext, + Operator leftChild, + int leftColumnCount, + Operator rightChild, + List<TSDataType> dataTypes, + TimeComparator comparator) { + + this.operatorContext = operatorContext; + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.outputColumnCount = dataTypes.size(); + this.comparator = comparator; + this.left = leftChild; + this.leftColumnCount = leftColumnCount; + this.right = rightChild; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + ListenableFuture<?> leftBlocked = left.isBlocked(); + ListenableFuture<?> rightBlocked = right.isBlocked(); + if (leftBlocked.isDone()) { + return rightBlocked; + } else if (rightBlocked.isDone()) { + return leftBlocked; + } else { + return successfulAsList(leftBlocked, rightBlocked); + } + } + + @Override + public TsBlock next() throws Exception { + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!prepareInput(start, maxRuntime)) { + return null; + } + + // still have time + if (System.nanoTime() - start < maxRuntime) { + long currentEndTime = + comparator.getCurrentEndTime(leftTsBlock.getEndTime(), rightTsBlock.getEndTime()); + + long time = leftTsBlock.getTimeByIndex(leftIndex); + + // all the rightTsBlock is less than leftTsBlock, just skip it + if (comparator.largerThan(time, rightTsBlock.getEndTime())) { + // clean rightTsBlock + rightTsBlock = null; + rightIndex = 0; + } else if (comparator.lessThan( + leftTsBlock.getEndTime(), rightTsBlock.getTimeByIndex(rightIndex))) { + // all the rightTsBlock is larger than leftTsBlock, fill null for right child + appendAllLeftTableAndFillNullForRightTable(); + } else { + // left and right are overlapped, do the left outer join row by row + int leftRowSize = leftTsBlock.getPositionCount(); + TimeColumnBuilder timeColumnBuilder = resultBuilder.getTimeColumnBuilder(); + + while (comparator.canContinueInclusive(time, currentEndTime) + && !resultBuilder.isFull() + && appendRightTableRow(time)) { + timeColumnBuilder.writeLong(time); + // deal with leftTsBlock + appendLeftTableRow(); + + if (leftIndex < leftRowSize) { + // update next row's time + time = leftTsBlock.getTimeByIndex(leftIndex); + } else { // all the leftTsBlock is consumed up + // clean leftTsBlock + leftTsBlock = null; + leftIndex = 0; + break; + } + } + } + } + TsBlock res = resultBuilder.build(); + resultBuilder.reset(); + return res; + } + + private boolean prepareInput(long start, long maxRuntime) throws Exception { + if ((leftTsBlock == null || leftTsBlock.getPositionCount() == leftIndex) && left.hasNext()) { + leftTsBlock = left.next(); + leftIndex = 0; + } + // still have time and right child still have remaining data + if ((System.nanoTime() - start < maxRuntime) + && (!rightFinished + && (rightTsBlock == null || rightTsBlock.getPositionCount() == rightIndex))) { + if (right.hasNext()) { + rightTsBlock = right.next(); + rightIndex = 0; + } else { + rightFinished = true; + } + } + return tsBlockIsNotEmpty(leftTsBlock, leftIndex) + && (rightFinished || tsBlockIsNotEmpty(rightTsBlock, rightIndex)); + } + + private boolean tsBlockIsNotEmpty(TsBlock tsBlock, int index) { + return tsBlock != null && index < tsBlock.getPositionCount(); + } + + private void appendLeftTableRow() { + for (int i = 0; i < leftColumnCount; i++) { + resultBuilder.getColumnBuilder(i).write(leftTsBlock.getColumn(i), leftIndex); + } + leftIndex++; + } + + /** + * deal with rightTsBlock + * + * @param time left table's current time + * @return true if we can append this row into result, that means there exists time in + * rightTsBlock larger than or equals to current time false if we cannot decide whether there + * exist corresponding time in right table until rightTsBlock is consumed up + */ + private boolean appendRightTableRow(long time) { + int rowCount = rightTsBlock.getPositionCount(); + + while (rightIndex < rowCount + && comparator.lessThan(rightTsBlock.getTimeByIndex(rightIndex), time)) { + rightIndex++; + } + + if (rightIndex == rowCount) { + // clean up rightTsBlock + rightTsBlock = null; + rightIndex = 0; + return false; + } + + if (rightTsBlock.getTimeByIndex(rightIndex) == time) { + // right table has this time, append right table's corresponding row + for (int i = leftColumnCount; i < outputColumnCount; i++) { + resultBuilder + .getColumnBuilder(i) + .write(rightTsBlock.getColumn(i - leftColumnCount), rightIndex); + } + // update right Index + rightIndex++; + } else { + // right table doesn't have this time, just append null for right table + for (int i = leftColumnCount; i < outputColumnCount; i++) { + resultBuilder.getColumnBuilder(i).appendNull(); + } + } + return true; + } + + private void appendAllLeftTableAndFillNullForRightTable() { + int rowSize = leftTsBlock.getPositionCount(); + // append time column + TimeColumnBuilder timeColumnBuilder = resultBuilder.getTimeColumnBuilder(); + TimeColumn leftTimeColumn = leftTsBlock.getTimeColumn(); + for (int i = leftIndex; i < rowSize; i++) { + timeColumnBuilder.writeLong(leftTimeColumn.getLong(i)); + } + + // append value column of left table + appendValueColumnForLeftTable(rowSize); + + // append null for each column of right table + appendNullForRightTable(rowSize); + + // clean leftTsBlock + leftTsBlock = null; + leftIndex = 0; + } + + private void appendValueColumnForLeftTable(int rowSize) { + for (int i = 0; i < leftColumnCount; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + Column valueColumn = leftTsBlock.getColumn(i); + + if (valueColumn.mayHaveNull()) { + for (int rowIndex = leftIndex; rowIndex < rowSize; rowIndex++) { + if (valueColumn.isNull(rowIndex)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(valueColumn, rowIndex); + } + } + } else { + // no null in current column, no need to do isNull judgement for each row in for-loop + for (int rowIndex = leftIndex; rowIndex < rowSize; rowIndex++) { + columnBuilder.write(valueColumn, rowIndex); + } + } + } + } + + private void appendNullForRightTable(int rowSize) { + int nullCount = rowSize - leftIndex; + for (int i = leftColumnCount; i < outputColumnCount; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + columnBuilder.appendNull(nullCount); + } + } + + @Override + public boolean hasNext() throws Exception { + return tsBlockIsNotEmpty(leftTsBlock, leftIndex) || left.hasNext(); + } + + @Override + public void close() throws Exception { + if (left != null) { + left.close(); + } + if (right != null) { + right.close(); + } + } + + @Override + public boolean isFinished() throws Exception { + return !tsBlockIsNotEmpty(leftTsBlock, leftIndex) && left.isFinished(); + } + + @Override + public long calculateMaxPeekMemory() { + return Math.max( + Math.max(left.calculateMaxPeekMemory(), right.calculateMaxPeekMemory()), + calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize()); + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + // leftTsBlock + leftChild.RetainedSizeAfterCallingNext + rightTsBlock + + // rightChild.RetainedSizeAfterCallingNext + return left.calculateMaxReturnSize() + + left.calculateRetainedSizeAfterCallingNext() + + right.calculateMaxReturnSize() + + right.calculateRetainedSizeAfterCallingNext(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java index 65de39f5717..e38755dbf0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java @@ -165,7 +165,7 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator { prepareForTimeHeap(); - } while (comparator.canContinue(currentTime, currentEndTime) && !timeSelector.isEmpty()); + } while (comparator.lessThan(currentTime, currentEndTime) && !timeSelector.isEmpty()); resultTsBlock = tsBlockBuilder.build(); return checkTsBlockSizeAndGetResult(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java index 1c809ba961c..74ee490c1dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java @@ -33,7 +33,17 @@ public class AscTimeComparator implements TimeComparator { } @Override - public boolean canContinue(long time, long endTime) { + public boolean lessThan(long time, long endTime) { return time < endTime; } + + @Override + public boolean largerThan(long time, long endTime) { + return time > endTime; + } + + @Override + public boolean canContinueInclusive(long time, long endTime) { + return time <= endTime; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java index 7f4e03a7cb9..0bcf2498931 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java @@ -33,7 +33,17 @@ public class DescTimeComparator implements TimeComparator { } @Override - public boolean canContinue(long time, long endTime) { + public boolean lessThan(long time, long endTime) { return time > endTime; } + + @Override + public boolean largerThan(long time, long endTime) { + return time < endTime; + } + + @Override + public boolean canContinueInclusive(long time, long endTime) { + return time >= endTime; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java index 0d35e9c6c3f..9d7a71ec101 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java @@ -24,9 +24,15 @@ public interface TimeComparator { /** return true if time is satisfied with endTime, otherwise false. */ boolean satisfyCurEndTime(long time, long endTime); + /** return time < endTime if order by time asc, time > endTime if order by desc. */ + boolean lessThan(long time, long endTime); + + /** return time > endTime if order by time asc, time < endTime if order by desc. */ + boolean largerThan(long time, long endTime); + /** return min(time1, time2) if order by time asc, max(time1, time2) if order by desc. */ long getCurrentEndTime(long time1, long time2); - /** return time < endTime if order by time asc, time > endTime if order by desc. */ - boolean canContinue(long time, long endTime); + /** return time <= endTime if order by time asc, time => endTime if order by desc. */ + boolean canContinueInclusive(long time, long endTime); }
