This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch support_exists_and_correlate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 45f99a79449f43feadcf58a120928ab2d6c8d2fc Author: lancelly <[email protected]> AuthorDate: Fri Jan 10 21:26:26 2025 +0800 add leftjoin operator --- .../relational/MergeSortLeftJoinOperator.java | 163 +++++++++++++++++++++ .../plan/planner/TableOperatorGenerator.java | 19 +++ 2 files changed, 182 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortLeftJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortLeftJoinOperator.java new file mode 100644 index 00000000000..be6cb2f3a51 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortLeftJoinOperator.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; + +public class MergeSortLeftJoinOperator extends AbstractMergeSortJoinOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(MergeSortLeftJoinOperator.class); + + public MergeSortLeftJoinOperator( + OperatorContext operatorContext, + Operator leftChild, + int[] leftJoinKeyPositions, + int[] leftOutputSymbolIdx, + Operator rightChild, + int[] rightJoinKeyPositions, + int[] rightOutputSymbolIdx, + List<JoinKeyComparator> joinKeyComparators, + List<TSDataType> dataTypes) { + super( + operatorContext, + leftChild, + leftJoinKeyPositions, + leftOutputSymbolIdx, + rightChild, + rightJoinKeyPositions, + rightOutputSymbolIdx, + joinKeyComparators, + dataTypes); + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + + return !leftFinished; + } + + @Override + protected boolean prepareInput() throws Exception { + gotCandidateBlocks(); + if (rightFinished) { + return leftBlockNotEmpty(); + } + return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock(); + } + + @Override + protected boolean processFinished() { + if (rightFinished) { + appendLeftWithEmptyRight(); + return true; + } + + // skip all NULL values in right, because NULL value will not match the left value + while (currentRightHasNullValue()) { + if (rightFinishedWithIncIndex()) { + return true; + } + } + + // all the join keys in rightTsBlock are less than leftTsBlock, just skip right + if (allRightLessThanLeft()) { + resetRightBlockList(); + return true; + } + + // all the join Keys in leftTsBlock are less than rightTsBlock, just append the left value + if (allLeftLessThanRight()) { + appendLeftWithEmptyRight(); + resetLeftBlock(); + return true; + } + + // continue right < left, until right >= left + while (lessThan( + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPositions, + rightIndex, + leftBlock, + leftJoinKeyPositions, + leftIndex)) { + if (rightFinishedWithIncIndex()) { + return true; + } + } + if (currentRoundNeedStop()) { + return true; + } + + // if current left is null, append null to result + while (currentLeftHasNullValue()) { + appendOneLeftRowWithEmptyRight(); + if (leftFinishedWithIncIndex()) { + return true; + } + } + + // continue left < right, until left >= right + while (lessThan( + leftBlock, + leftJoinKeyPositions, + leftIndex, + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPositions, + rightIndex)) { + appendOneLeftRowWithEmptyRight(); + if (leftFinishedWithIncIndex()) { + return true; + } + } + if (currentRoundNeedStop()) { + return true; + } + + // has right value equals to current left, append to join result, inc leftIndex + return hasMatchedRightValueToProbeLeft() && leftFinishedWithIncIndex(); + } + + @Override + protected void recordsWhenDataMatches() { + // do nothing + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild) + + RamUsageEstimator.sizeOf(leftOutputSymbolIdx) + + RamUsageEstimator.sizeOf(rightOutputSymbolIdx) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + resultBuilder.getRetainedSizeInBytes(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1d99562e5ed..efd8323af07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -84,6 +84,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Info import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortFullOuterJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortInnerJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortLeftJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortSemiJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator; @@ -1467,6 +1468,24 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution JoinKeyComparatorFactory.getComparators(joinKeyTypes, true), dataTypes, joinKeyTypes.stream().map(this::buildUpdateLastRowFunction).collect(Collectors.toList())); + } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + MergeSortLeftJoinOperator.class.getSimpleName()); + return new MergeSortLeftJoinOperator( + operatorContext, + leftChild, + leftJoinKeyPositions, + leftOutputSymbolIdx, + rightChild, + rightJoinKeyPositions, + rightOutputSymbolIdx, + JoinKeyComparatorFactory.getComparators(joinKeyTypes, true), + dataTypes); } throw new IllegalStateException("Unsupported join type: " + node.getJoinType());
