This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/joinOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/joinOperator by this 
push:
     new bf2ef59b245 perfect join operator impl
bf2ef59b245 is described below

commit bf2ef59b245ce9e64944c94f37759b2844bb9987
Author: Beyyes <[email protected]>
AuthorDate: Fri Sep 6 10:38:32 2024 +0800

    perfect join operator impl
---
 .../source/relational/InnerJoinOperator.java       | 303 +++++++++++++++++++++
 .../plan/planner/OperatorTreeGenerator.java        |   2 +-
 .../plan/planner/TableOperatorGenerator.java       |  41 ++-
 .../optimizations/PushPredicateIntoTableScan.java  |  17 +-
 4 files changed, 355 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
new file mode 100644
index 00000000000..f080dcc187e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class InnerJoinOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+
+  private final Operator leftChild;
+  private TsBlock leftTsBlock;
+  private int leftIndex; // start index of leftTsBlock
+  private final int leftTimeColumnPosition;
+  private final int[] leftOutputSymbolIdx;
+
+  private final Operator rightChild;
+  private TsBlock rightTsBlock;
+  private int rightIndex; // start index of rightTsBlock
+  private final int rightTimeColumnPosition;
+  private final int[] rightOutputSymbolIdx;
+
+  private final TimeComparator comparator;
+  private final TsBlockBuilder resultBuilder;
+
+  private final long maxReturnSize =
+      TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+
+  public InnerJoinOperator(
+      OperatorContext operatorContext,
+      Operator leftChild,
+      int leftTimeColumnPosition,
+      int[] leftOutputSymbolIdx,
+      Operator rightChild,
+      int rightTimeColumnPosition,
+      int[] rightOutputSymbolIdx,
+      TimeComparator timeComparator,
+      List<TSDataType> dataTypes) {
+    this.operatorContext = operatorContext;
+    this.leftChild = leftChild;
+    this.leftTimeColumnPosition = leftTimeColumnPosition;
+    this.leftOutputSymbolIdx = leftOutputSymbolIdx;
+    this.rightChild = rightChild;
+    this.rightTimeColumnPosition = rightTimeColumnPosition;
+    this.rightOutputSymbolIdx = rightOutputSymbolIdx;
+
+    this.comparator = timeComparator;
+    this.resultBuilder = new TsBlockBuilder(dataTypes);
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> leftBlocked = leftChild.isBlocked();
+    ListenableFuture<?> rightBlocked = rightChild.isBlocked();
+    if (leftBlocked.isDone()) {
+      return rightBlocked;
+    } else if (rightBlocked.isDone()) {
+      return leftBlocked;
+    } else {
+      return successfulAsList(leftBlocked, rightBlocked);
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return (tsBlockIsNotEmpty(leftTsBlock, leftIndex) || 
leftChild.hasNextWithTimer())
+        && (tsBlockIsNotEmpty(rightTsBlock, rightIndex) || 
rightChild.hasNextWithTimer());
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    // start stopwatch
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    if (!prepareInput(start, maxRuntime)) {
+      return null;
+    }
+
+    // all the rightTsBlock is less than leftTsBlock, just skip it
+    if (comparator.largerThan(getCurrentLeftTime(), getRightEndTime())) {
+      // clean rightTsBlock
+      rightTsBlock = null;
+      rightIndex = 0;
+      return null;
+    }
+
+    // all the leftTsBlock is less than rightTsBlock, just skip it
+    else if (comparator.largerThan(getCurrentRightTime(), getLeftEndTime())) {
+      // clean rightTsBlock
+      leftTsBlock = null;
+      leftIndex = 0;
+      return null;
+    }
+
+    long leftProbeTime = getCurrentLeftTime();
+    long currentEndTime = comparator.getCurrentEndTime(getLeftEndTime(), 
getRightEndTime());
+    while (!resultBuilder.isFull()
+        && comparator.canContinueInclusive(leftProbeTime, currentEndTime)) {
+      appendTableRows(leftProbeTime);
+      leftIndex++;
+
+      // all left tsblock has been consumed
+      if (leftIndex >= leftTsBlock.getPositionCount()) {
+        leftTsBlock = null;
+        leftIndex = 0;
+        break;
+      }
+    }
+
+    // TODO if will return empty tsblock?
+
+    Column[] valueColumns = new 
Column[resultBuilder.getValueColumnBuilders().length];
+
+    int declaredPositions = resultBuilder.getPositionCount();
+    for (int i = 0; i < valueColumns.length; ++i) {
+      valueColumns[i] = resultBuilder.getValueColumnBuilders()[i].build();
+      if (valueColumns[i].getPositionCount() != declaredPositions) {
+        throw new IllegalStateException(
+            String.format(
+                "Declared positions (%s) does not match column %s's number of 
entries (%s)",
+                declaredPositions, i, valueColumns[i].getPositionCount()));
+      }
+    }
+
+    TsBlock result =
+        TsBlock.wrapBlocksWithoutCopy(
+            this.resultBuilder.getPositionCount(),
+            new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1),
+            valueColumns);
+    resultBuilder.reset();
+    return result;
+  }
+
+  private long getCurrentLeftTime() {
+    return leftTsBlock.getColumn(leftTimeColumnPosition).getLong(leftIndex);
+  }
+
+  private long getCurrentRightTime() {
+    return rightTsBlock.getColumn(rightTimeColumnPosition).getLong(rightIndex);
+  }
+
+  private long getRightTime(int idx) {
+    return rightTsBlock.getColumn(rightTimeColumnPosition).getLong(idx);
+  }
+
+  private long getLeftEndTime() {
+    return leftTsBlock
+        .getColumn(leftTimeColumnPosition)
+        .getLong(leftTsBlock.getPositionCount() - 1);
+  }
+
+  private long getRightEndTime() {
+    return rightTsBlock
+        .getColumn(rightTimeColumnPosition)
+        .getLong(rightTsBlock.getPositionCount() - 1);
+  }
+
+  private void appendTableRows(long leftTime) {
+    while (rightIndex < rightTsBlock.getPositionCount()
+        && comparator.lessThan(getCurrentRightTime(), leftTime)) {
+      rightIndex++;
+    }
+    int idx = rightIndex;
+    try {
+      while (idx < rightTsBlock.getPositionCount() && getRightTime(idx) >= 
leftTime) {
+        if (leftTime == getRightTime(idx)) {
+          for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
+            ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+            if 
(leftTsBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) {
+              columnBuilder.appendNull();
+            } else {
+              
columnBuilder.write(leftTsBlock.getColumn(leftOutputSymbolIdx[i]), leftIndex);
+            }
+          }
+          for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
+            ColumnBuilder columnBuilder =
+                resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i);
+
+            if (rightTsBlock.getColumn(rightOutputSymbolIdx[i]).isNull(idx)) {
+              columnBuilder.appendNull();
+            } else {
+              
columnBuilder.write(rightTsBlock.getColumn(rightOutputSymbolIdx[i]), idx);
+            }
+          }
+          resultBuilder.declarePosition();
+          idx++;
+        } else {
+          break;
+        }
+      }
+    } catch (Exception e) {
+      System.out.println("aa");
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (leftChild != null) {
+      leftChild.close();
+    }
+    if (rightChild != null) {
+      rightChild.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !tsBlockIsNotEmpty(leftTsBlock, leftIndex)
+        && leftChild.isFinished()
+        && !tsBlockIsNotEmpty(rightTsBlock, rightIndex)
+        && rightChild.isFinished();
+  }
+
+  private boolean prepareInput(long start, long maxRuntime) throws Exception {
+    if ((leftTsBlock == null || leftTsBlock.getPositionCount() == leftIndex)
+        && leftChild.hasNextWithTimer()) {
+      leftTsBlock = leftChild.nextWithTimer();
+      leftIndex = 0;
+    }
+
+    if ((System.nanoTime() - start < maxRuntime)
+        && (rightTsBlock == null || rightTsBlock.getPositionCount() == 
rightIndex)) {
+      if (rightChild.hasNextWithTimer()) {
+        rightTsBlock = rightChild.nextWithTimer();
+        rightIndex = 0;
+      }
+    }
+    return tsBlockIsNotEmpty(leftTsBlock, leftIndex) && 
tsBlockIsNotEmpty(rightTsBlock, rightIndex);
+  }
+
+  private boolean tsBlockIsNotEmpty(TsBlock tsBlock, int index) {
+    return tsBlock != null && index < tsBlock.getPositionCount();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return Math.max(
+        Math.max(
+            leftChild.calculateMaxPeekMemoryWithCounter(),
+            rightChild.calculateMaxPeekMemoryWithCounter()),
+        calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return maxReturnSize;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    // leftTsBlock + leftChild.RetainedSizeAfterCallingNext + rightTsBlock +
+    // rightChild.RetainedSizeAfterCallingNext
+    return leftChild.calculateMaxReturnSize()
+        + leftChild.calculateRetainedSizeAfterCallingNext()
+        + rightChild.calculateMaxReturnSize()
+        + rightChild.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 93c6b6a1f3b..cf9a3dbba65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -327,7 +327,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
   private static final DataNodeSchemaCache DATA_NODE_SCHEMA_CACHE =
       DataNodeSchemaCache.getInstance();
 
-  private static final TimeComparator ASC_TIME_COMPARATOR = new 
AscTimeComparator();
+  public static final TimeComparator ASC_TIME_COMPARATOR = new 
AscTimeComparator();
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new 
DescTimeComparator();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index c4c7bd5f2ad..d37073a0452 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -49,6 +49,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSo
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InnerJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
@@ -115,6 +116,7 @@ import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNod
 import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.ASC_TIME_COMPARATOR;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 
 /** This Visitor is responsible for transferring Table PlanNode Tree to Table 
Operator Tree. */
@@ -740,7 +742,44 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
 
   @Override
   public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
-    throw new IllegalStateException("JoinOperator is not implemented 
currently.");
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(), node.getPlanNodeId(), 
JoinNode.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+
+    Operator leftChild = node.getLeftChild().accept(this, context);
+    Operator rightChild = node.getRightChild().accept(this, context);
+
+    int leftTimeColumnIdx =
+        
node.getLeftChild().getOutputSymbols().indexOf(node.getCriteria().get(0).getLeft());
+    int rightTimeColumnIdx =
+        
node.getRightChild().getOutputSymbols().indexOf(node.getCriteria().get(0).getRight());
+    int[] leftOutputSymbolIdx = new int[node.getLeftOutputSymbols().size()];
+    for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
+      leftOutputSymbolIdx[i] =
+          
node.getLeftChild().getOutputSymbols().indexOf(node.getLeftOutputSymbols().get(i));
+    }
+    int[] rightOutputSymbolIdx = new int[node.getRightOutputSymbols().size()];
+    for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
+      rightOutputSymbolIdx[i] =
+          
node.getRightChild().getOutputSymbols().indexOf(node.getRightOutputSymbols().get(i));
+    }
+    if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
+      return new InnerJoinOperator(
+          operatorContext,
+          leftChild,
+          leftTimeColumnIdx,
+          leftOutputSymbolIdx,
+          rightChild,
+          rightTimeColumnIdx,
+          rightOutputSymbolIdx,
+          ASC_TIME_COMPARATOR,
+          dataTypes);
+    }
+
+    throw new IllegalStateException("Unsupported join type: " + 
node.getJoinType());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index ca3cf3f854c..3faf5a59d99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -597,23 +597,28 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
                 newJoinFilter,
                 node.isSpillable());
       }
-      Symbol timeSymbol = Symbol.of("time");
-      OrderingScheme orderingScheme =
+
+      JoinNode.EquiJoinClause joinCriteria = node.getCriteria().get(0);
+      OrderingScheme leftOrderingScheme =
+          new OrderingScheme(
+              Collections.singletonList(joinCriteria.getLeft()),
+              Collections.singletonMap(joinCriteria.getLeft(), 
ASC_NULLS_LAST));
+      OrderingScheme rightOrderingScheme =
           new OrderingScheme(
-              Collections.singletonList(timeSymbol),
-              Collections.singletonMap(timeSymbol, ASC_NULLS_LAST));
+              Collections.singletonList(joinCriteria.getRight()),
+              Collections.singletonMap(joinCriteria.getRight(), 
ASC_NULLS_LAST));
       SortNode leftSortNode =
           new SortNode(
               queryId.genPlanNodeId(),
               ((JoinNode) output).getLeftChild(),
-              orderingScheme,
+              leftOrderingScheme,
               false,
               false);
       SortNode rightSortNode =
           new SortNode(
               queryId.genPlanNodeId(),
               ((JoinNode) output).getRightChild(),
-              orderingScheme,
+              rightOrderingScheme,
               false,
               false);
       ((JoinNode) output).setLeftChild(leftSortNode);

Reply via email to