This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6207e102e8e33ba7b33369eac5b2ded02d5a29c7
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 19 20:23:20 2023 +0800

    Add LeftOuterTimeJoinOperator
---
 .../process/join/LeftOuterTimeJoinOperator.java    | 324 +++++++++++++++++++++
 .../process/join/RowBasedTimeJoinOperator.java     |   2 +-
 .../process/join/merge/AscTimeComparator.java      |  12 +-
 .../process/join/merge/DescTimeComparator.java     |  12 +-
 .../process/join/merge/TimeComparator.java         |  10 +-
 5 files changed, 355 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
new file mode 100644
index 00000000000..651c47d3c2c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class LeftOuterTimeJoinOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final int outputColumnCount;
+
+  private final TimeComparator comparator;
+
+  private final TsBlockBuilder resultBuilder;
+
+  private final Operator left;
+  private final int leftColumnCount;
+
+  private TsBlock leftTsBlock;
+
+  // start index of leftTsBlock
+  private int leftIndex;
+
+  private final Operator right;
+
+  private TsBlock rightTsBlock;
+
+  // start index of rightTsBlock
+  private int rightIndex;
+
+  private boolean rightFinished = false;
+
+  private final long maxReturnSize =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  public LeftOuterTimeJoinOperator(
+      OperatorContext operatorContext,
+      Operator leftChild,
+      int leftColumnCount,
+      Operator rightChild,
+      List<TSDataType> dataTypes,
+      TimeComparator comparator) {
+
+    this.operatorContext = operatorContext;
+    this.resultBuilder = new TsBlockBuilder(dataTypes);
+    this.outputColumnCount = dataTypes.size();
+    this.comparator = comparator;
+    this.left = leftChild;
+    this.leftColumnCount = leftColumnCount;
+    this.right = rightChild;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> leftBlocked = left.isBlocked();
+    ListenableFuture<?> rightBlocked = right.isBlocked();
+    if (leftBlocked.isDone()) {
+      return rightBlocked;
+    } else if (rightBlocked.isDone()) {
+      return leftBlocked;
+    } else {
+      return successfulAsList(leftBlocked, rightBlocked);
+    }
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    // start stopwatch
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    if (!prepareInput(start, maxRuntime)) {
+      return null;
+    }
+
+    // still have time
+    if (System.nanoTime() - start < maxRuntime) {
+      long currentEndTime =
+          comparator.getCurrentEndTime(leftTsBlock.getEndTime(), 
rightTsBlock.getEndTime());
+
+      long time = leftTsBlock.getTimeByIndex(leftIndex);
+
+      // all the rightTsBlock is less than leftTsBlock, just skip it
+      if (comparator.largerThan(time, rightTsBlock.getEndTime())) {
+        // clean rightTsBlock
+        rightTsBlock = null;
+        rightIndex = 0;
+      } else if (comparator.lessThan(
+          leftTsBlock.getEndTime(), rightTsBlock.getTimeByIndex(rightIndex))) {
+        // all the rightTsBlock is larger than leftTsBlock, fill null for 
right child
+        appendAllLeftTableAndFillNullForRightTable();
+      } else {
+        // left and right are overlapped, do the left outer join row by row
+        int leftRowSize = leftTsBlock.getPositionCount();
+        TimeColumnBuilder timeColumnBuilder = 
resultBuilder.getTimeColumnBuilder();
+
+        while (comparator.canContinueInclusive(time, currentEndTime)
+            && !resultBuilder.isFull()
+            && appendRightTableRow(time)) {
+          timeColumnBuilder.writeLong(time);
+          // deal with leftTsBlock
+          appendLeftTableRow();
+
+          if (leftIndex < leftRowSize) {
+            // update next row's time
+            time = leftTsBlock.getTimeByIndex(leftIndex);
+          } else { // all the leftTsBlock is consumed up
+            // clean leftTsBlock
+            leftTsBlock = null;
+            leftIndex = 0;
+            break;
+          }
+        }
+      }
+    }
+    TsBlock res = resultBuilder.build();
+    resultBuilder.reset();
+    return res;
+  }
+
+  private boolean prepareInput(long start, long maxRuntime) throws Exception {
+    if ((leftTsBlock == null || leftTsBlock.getPositionCount() == leftIndex) 
&& left.hasNext()) {
+      leftTsBlock = left.next();
+      leftIndex = 0;
+    }
+    // still have time and right child still have remaining data
+    if ((System.nanoTime() - start < maxRuntime)
+        && (!rightFinished
+            && (rightTsBlock == null || rightTsBlock.getPositionCount() == 
rightIndex))) {
+      if (right.hasNext()) {
+        rightTsBlock = right.next();
+        rightIndex = 0;
+      } else {
+        rightFinished = true;
+      }
+    }
+    return tsBlockIsNotEmpty(leftTsBlock, leftIndex)
+        && (rightFinished || tsBlockIsNotEmpty(rightTsBlock, rightIndex));
+  }
+
+  private boolean tsBlockIsNotEmpty(TsBlock tsBlock, int index) {
+    return tsBlock != null && index < tsBlock.getPositionCount();
+  }
+
+  private void appendLeftTableRow() {
+    for (int i = 0; i < leftColumnCount; i++) {
+      resultBuilder.getColumnBuilder(i).write(leftTsBlock.getColumn(i), 
leftIndex);
+    }
+    leftIndex++;
+  }
+
+  /**
+   * deal with rightTsBlock
+   *
+   * @param time left table's current time
+   * @return true if we can append this row into result, that means there 
exists time in
+   *     rightTsBlock larger than or equals to current time false if we cannot 
decide whether there
+   *     exist corresponding time in right table until rightTsBlock is 
consumed up
+   */
+  private boolean appendRightTableRow(long time) {
+    int rowCount = rightTsBlock.getPositionCount();
+
+    while (rightIndex < rowCount
+        && comparator.lessThan(rightTsBlock.getTimeByIndex(rightIndex), time)) 
{
+      rightIndex++;
+    }
+
+    if (rightIndex == rowCount) {
+      // clean up rightTsBlock
+      rightTsBlock = null;
+      rightIndex = 0;
+      return false;
+    }
+
+    if (rightTsBlock.getTimeByIndex(rightIndex) == time) {
+      // right table has this time, append right table's corresponding row
+      for (int i = leftColumnCount; i < outputColumnCount; i++) {
+        resultBuilder
+            .getColumnBuilder(i)
+            .write(rightTsBlock.getColumn(i - leftColumnCount), rightIndex);
+      }
+      // update right Index
+      rightIndex++;
+    } else {
+      // right table doesn't have this time, just append null for right table
+      for (int i = leftColumnCount; i < outputColumnCount; i++) {
+        resultBuilder.getColumnBuilder(i).appendNull();
+      }
+    }
+    return true;
+  }
+
+  private void appendAllLeftTableAndFillNullForRightTable() {
+    int rowSize = leftTsBlock.getPositionCount();
+    // append time column
+    TimeColumnBuilder timeColumnBuilder = resultBuilder.getTimeColumnBuilder();
+    TimeColumn leftTimeColumn = leftTsBlock.getTimeColumn();
+    for (int i = leftIndex; i < rowSize; i++) {
+      timeColumnBuilder.writeLong(leftTimeColumn.getLong(i));
+    }
+
+    // append value column of left table
+    appendValueColumnForLeftTable(rowSize);
+
+    // append null for each column of right table
+    appendNullForRightTable(rowSize);
+
+    // clean leftTsBlock
+    leftTsBlock = null;
+    leftIndex = 0;
+  }
+
+  private void appendValueColumnForLeftTable(int rowSize) {
+    for (int i = 0; i < leftColumnCount; i++) {
+      ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+      Column valueColumn = leftTsBlock.getColumn(i);
+
+      if (valueColumn.mayHaveNull()) {
+        for (int rowIndex = leftIndex; rowIndex < rowSize; rowIndex++) {
+          if (valueColumn.isNull(rowIndex)) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.write(valueColumn, rowIndex);
+          }
+        }
+      } else {
+        // no null in current column, no need to do isNull judgement for each 
row in for-loop
+        for (int rowIndex = leftIndex; rowIndex < rowSize; rowIndex++) {
+          columnBuilder.write(valueColumn, rowIndex);
+        }
+      }
+    }
+  }
+
+  private void appendNullForRightTable(int rowSize) {
+    int nullCount = rowSize - leftIndex;
+    for (int i = leftColumnCount; i < outputColumnCount; i++) {
+      ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+      columnBuilder.appendNull(nullCount);
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return tsBlockIsNotEmpty(leftTsBlock, leftIndex) || left.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (left != null) {
+      left.close();
+    }
+    if (right != null) {
+      right.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !tsBlockIsNotEmpty(leftTsBlock, leftIndex) && left.isFinished();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return Math.max(
+        Math.max(left.calculateMaxPeekMemory(), 
right.calculateMaxPeekMemory()),
+        calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // leftTsBlock + leftChild.RetainedSizeAfterCallingNext + rightTsBlock +
+    // rightChild.RetainedSizeAfterCallingNext
+    return left.calculateMaxReturnSize()
+        + left.calculateRetainedSizeAfterCallingNext()
+        + right.calculateMaxReturnSize()
+        + right.calculateRetainedSizeAfterCallingNext();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 65de39f5717..e38755dbf0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -165,7 +165,7 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
       prepareForTimeHeap();
 
-    } while (comparator.canContinue(currentTime, currentEndTime) && 
!timeSelector.isEmpty());
+    } while (comparator.lessThan(currentTime, currentEndTime) && 
!timeSelector.isEmpty());
 
     resultTsBlock = tsBlockBuilder.build();
     return checkTsBlockSizeAndGetResult();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java
index 1c809ba961c..74ee490c1dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/AscTimeComparator.java
@@ -33,7 +33,17 @@ public class AscTimeComparator implements TimeComparator {
   }
 
   @Override
-  public boolean canContinue(long time, long endTime) {
+  public boolean lessThan(long time, long endTime) {
     return time < endTime;
   }
+
+  @Override
+  public boolean largerThan(long time, long endTime) {
+    return time > endTime;
+  }
+
+  @Override
+  public boolean canContinueInclusive(long time, long endTime) {
+    return time <= endTime;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java
index 7f4e03a7cb9..0bcf2498931 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/DescTimeComparator.java
@@ -33,7 +33,17 @@ public class DescTimeComparator implements TimeComparator {
   }
 
   @Override
-  public boolean canContinue(long time, long endTime) {
+  public boolean lessThan(long time, long endTime) {
     return time > endTime;
   }
+
+  @Override
+  public boolean largerThan(long time, long endTime) {
+    return time < endTime;
+  }
+
+  @Override
+  public boolean canContinueInclusive(long time, long endTime) {
+    return time >= endTime;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java
index 0d35e9c6c3f..9d7a71ec101 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/TimeComparator.java
@@ -24,9 +24,15 @@ public interface TimeComparator {
   /** return true if time is satisfied with endTime, otherwise false. */
   boolean satisfyCurEndTime(long time, long endTime);
 
+  /** return time < endTime if order by time asc, time > endTime if order by 
desc. */
+  boolean lessThan(long time, long endTime);
+
+  /** return time > endTime if order by time asc, time < endTime if order by 
desc. */
+  boolean largerThan(long time, long endTime);
+
   /** return min(time1, time2) if order by time asc, max(time1, time2) if 
order by desc. */
   long getCurrentEndTime(long time1, long time2);
 
-  /** return time < endTime if order by time asc, time > endTime if order by 
desc. */
-  boolean canContinue(long time, long endTime);
+  /** return time <= endTime if order by time asc, time => endTime if order by 
desc. */
+  boolean canContinueInclusive(long time, long endTime);
 }

Reply via email to