This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TimeJoin by this push:
     new 4005dfb7853 Add InnerTimeJoinOperator
4005dfb7853 is described below

commit 4005dfb7853effe77ea658d1d8663e671adba0c6
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Dec 20 20:15:47 2023 +0800

    Add InnerTimeJoinOperator
---
 .../process/join/InnerTimeJoinOperator.java        | 379 +++++++++++++++++++++
 .../process/join/LeftOuterTimeJoinOperator.java    |  11 +-
 2 files changed, 385 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
new file mode 100644
index 00000000000..77d4099d7b7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class InnerTimeJoinOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final long maxReturnSize =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  /** Start index for each input TsBlocks and size of it is equal to 
inputTsBlocks. */
+  private final int[] inputIndex;
+
+  private final List<Operator> children;
+  private final int inputOperatorsCount;
+  /** TsBlock from child operator. Only one cache now. */
+  private final TsBlock[] inputTsBlocks;
+
+  private final boolean[] canCallNext;
+
+  private final TsBlockBuilder resultBuilder;
+
+  private final TimeComparator comparator;
+
+  /** Index of the child that is currently fetching input */
+  private int currentChildIndex = 0;
+
+  /** Indicate whether we found an empty child input in one loop */
+  private boolean hasEmptyChildInput = false;
+
+  public InnerTimeJoinOperator(
+      OperatorContext operatorContext,
+      List<Operator> children,
+      List<TSDataType> dataTypes,
+      TimeComparator comparator) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.canCallNext = new boolean[inputOperatorsCount];
+    checkArgument(
+        children.size() > 1, "child size of InnerTimeJoinOperator should be 
larger than 1");
+    this.inputIndex = new int[this.inputOperatorsCount];
+    this.resultBuilder = new TsBlockBuilder(dataTypes);
+    this.comparator = comparator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        continue;
+      }
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
+    }
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    // start stopwatch
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    if (!prepareInput(start, maxRuntime)) {
+      return null;
+    }
+
+    // still have time
+    if (System.nanoTime() - start < maxRuntime) {
+      // End time for returned TsBlock this time, it's the min/max end time 
among all the children
+      // TsBlocks order by asc/desc
+      long currentEndTime = 0;
+      boolean init = false;
+
+      // Get TsBlock for each input, put their time stamp into TimeSelector 
and then use the min
+      // Time
+      // among all the input TsBlock as the current output TsBlock's endTime.
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        // Update the currentEndTime if the TsBlock is not empty
+        currentEndTime =
+            init
+                ? comparator.getCurrentEndTime(currentEndTime, 
inputTsBlocks[i].getEndTime())
+                : inputTsBlocks[i].getEndTime();
+        init = true;
+      }
+
+      // collect time that each child has
+      int[][] selectedRowIndexArray = buildTimeColumn(currentEndTime);
+
+      // build value columns for each child
+      int columnIndex = 0;
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        columnIndex += buildValueColumns(columnIndex, i, 
selectedRowIndexArray[i]);
+      }
+    }
+
+    TsBlock res = resultBuilder.build();
+    resultBuilder.reset();
+    return res;
+  }
+
+  // return selected row index for each child's tsblock
+  private int[][] buildTimeColumn(long currentEndTime) {
+    TimeColumnBuilder timeBuilder = resultBuilder.getTimeColumnBuilder();
+    List<List<Integer>> selectedRowIndexArray = new 
ArrayList<>(inputOperatorsCount);
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      selectedRowIndexArray.add(new ArrayList<>());
+    }
+
+    int column0Size = inputTsBlocks[0].getPositionCount();
+
+    while (inputIndex[0] < column0Size
+        && comparator.canContinueInclusive(
+            inputTsBlocks[0].getTimeByIndex(inputIndex[0]), currentEndTime)) {
+      long time = inputTsBlocks[0].getTimeByIndex(inputIndex[0]);
+      inputIndex[0]++;
+      boolean allHave = true;
+      for (int i = 1; i < inputOperatorsCount; i++) {
+        int size = inputTsBlocks[i].getPositionCount();
+        updateInputIndex(i, time);
+
+        if (inputIndex[i] == size || 
inputTsBlocks[i].getTimeByIndex(inputIndex[i]) != time) {
+          allHave = false;
+          break;
+        } else {
+          inputIndex[i]++;
+        }
+      }
+      if (allHave) {
+        timeBuilder.writeLong(time);
+        appendOneSelectedRow(selectedRowIndexArray);
+      }
+    }
+
+    // update inputIndex for each child to the last index larger than 
currentEndTime
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      updateInputIndex(i, currentEndTime);
+    }
+
+    return transformListToIntArray(selectedRowIndexArray);
+  }
+
+  private void appendOneSelectedRow(List<List<Integer>> selectedRowIndexArray) 
{
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      selectedRowIndexArray.get(0).add(inputIndex[i] - 1);
+    }
+  }
+
+  private void updateInputIndex(int i, long currentEndTime) {
+    int size = inputTsBlocks[i].getPositionCount();
+    while (inputIndex[i] < size
+        && comparator.lessThan(inputTsBlocks[i].getTimeByIndex(inputIndex[i]), 
currentEndTime)) {
+      inputIndex[i]++;
+    }
+  }
+
+  private int[][] transformListToIntArray(List<List<Integer>> lists) {
+    if (lists.size() <= 1) {
+      throw new IllegalStateException(
+          "Child size of InnerTimeJoinOperator should be larger than 1.");
+    }
+    int[][] res = new int[lists.size()][lists.get(0).size()];
+    for (int i = 0; i < res.length; i++) {
+      List<Integer> list = lists.get(i);
+      int[] array = res[i];
+      if (list.size() != array.length) {
+        throw new IllegalStateException("All child should have same time 
column result!");
+      }
+      for (int j = 0; j < array.length; j++) {
+        array[j] = list.get(j);
+      }
+    }
+    return res;
+  }
+
+  private int buildValueColumns(int startColumnIndex, int childIndex, int[] 
selectedRowIndex) {
+    TsBlock tsBlock = inputTsBlocks[childIndex];
+    for (int i = 0, size = inputTsBlocks[childIndex].getValueColumnCount(); i 
< size; i++) {
+      ColumnBuilder columnBuilder = 
resultBuilder.getColumnBuilder(startColumnIndex + i);
+      Column column = inputTsBlocks[childIndex].getColumn(i);
+      if (column.mayHaveNull()) {
+        for (int rowIndex : selectedRowIndex) {
+          if (column.isNull(rowIndex)) {
+            columnBuilder.appendNull();
+          } else {
+            columnBuilder.write(column, rowIndex);
+          }
+        }
+      } else {
+        for (int rowIndex : selectedRowIndex) {
+          columnBuilder.write(column, rowIndex);
+        }
+      }
+    }
+
+    return startColumnIndex + tsBlock.getValueColumnCount();
+  }
+
+  /**
+   * Try to cache one result of each child.
+   *
+   * @return true if results of all children are ready. Return false if some 
children is blocked or
+   *     return null.
+   * @throws Exception errors happened while getting tsblock from children
+   */
+  private boolean prepareInput(long start, long maxRuntime) throws Exception {
+    while (System.nanoTime() - start < maxRuntime && currentChildIndex < 
inputOperatorsCount) {
+      if (!isEmpty(currentChildIndex)) {
+        currentChildIndex++;
+        continue;
+      }
+      if (canCallNext[currentChildIndex]) {
+        if (children.get(currentChildIndex).hasNextWithTimer()) {
+          inputIndex[currentChildIndex] = 0;
+          inputTsBlocks[currentChildIndex] = 
children.get(currentChildIndex).nextWithTimer();
+          canCallNext[currentChildIndex] = false;
+          // child operator has next but return an empty TsBlock which means 
that it may not
+          // finish calculation in given time slice.
+          // In such case, TimeJoinOperator can't go on calculating, so we 
just return null.
+          // We can also use the while loop here to continuously call the 
hasNext() and next()
+          // methods of the child operator until its hasNext() returns false 
or the next() gets
+          // the data that is not empty, but this will cause the execution 
time of the while loop
+          // to be uncontrollable and may exceed all allocated time slice
+          if (isEmpty(currentChildIndex)) {
+            hasEmptyChildInput = true;
+          }
+        } else {
+          return false;
+        }
+      } else {
+        hasEmptyChildInput = true;
+      }
+      currentChildIndex++;
+    }
+
+    if (currentChildIndex == inputOperatorsCount) {
+      // start a new loop
+      currentChildIndex = 0;
+      if (!hasEmptyChildInput) {
+        // all children are ready now
+        return true;
+      } else {
+        // In a new loop, previously empty child input could be non-empty now, 
and we can skip the
+        // children that have generated input
+        hasEmptyChildInput = false;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    // return false if any child is consumed up.
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (isEmpty(i) && canCallNext[i] && !children.get(i).hasNextWithTimer()) 
{
+        return false;
+      }
+    }
+    // return true if all children still hava data
+    return true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (children.get(i) != null) {
+        children.get(i).close();
+      }
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    // return true if any child is finished.
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (isEmpty(i) && children.get(i).isFinished()) {
+        return true;
+      }
+    }
+    // return false if all children still hava data
+    return false;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 0;
+    long childrenMaxPeekMemory = 0;
+    for (Operator child : children) {
+      childrenMaxPeekMemory =
+          Math.max(childrenMaxPeekMemory, maxPeekMemory + 
child.calculateMaxPeekMemory());
+      maxPeekMemory +=
+          (child.calculateMaxReturnSize() + 
child.calculateRetainedSizeAfterCallingNext());
+    }
+
+    maxPeekMemory += calculateMaxReturnSize();
+    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+    }
+    // max cached TsBlock
+    return currentRetainedSize - minChildReturnSize;
+  }
+
+  /**
+   * If the tsBlock of columnIndex is null or has no more data in the tsBlock, 
return true; else
+   * return false.
+   */
+  protected boolean isEmpty(int columnIndex) {
+    return inputTsBlocks[columnIndex] == null
+        || inputTsBlocks[columnIndex].getPositionCount() == 
inputIndex[columnIndex];
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
index b997484a1e3..2dd07776d55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
@@ -160,16 +160,17 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
   }
 
   private boolean prepareInput(long start, long maxRuntime) throws Exception {
-    if ((leftTsBlock == null || leftTsBlock.getPositionCount() == leftIndex) 
&& left.hasNext()) {
-      leftTsBlock = left.next();
+    if ((leftTsBlock == null || leftTsBlock.getPositionCount() == leftIndex)
+        && left.hasNextWithTimer()) {
+      leftTsBlock = left.nextWithTimer();
       leftIndex = 0;
     }
     // still have time and right child still have remaining data
     if ((System.nanoTime() - start < maxRuntime)
         && (!rightFinished
             && (rightTsBlock == null || rightTsBlock.getPositionCount() == 
rightIndex))) {
-      if (right.hasNext()) {
-        rightTsBlock = right.next();
+      if (right.hasNextWithTimer()) {
+        rightTsBlock = right.nextWithTimer();
         rightIndex = 0;
       } else {
         rightFinished = true;
@@ -295,7 +296,7 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    return tsBlockIsNotEmpty(leftTsBlock, leftIndex) || left.hasNext();
+    return tsBlockIsNotEmpty(leftTsBlock, leftIndex) || 
left.hasNextWithTimer();
   }
 
   @Override

Reply via email to