This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 767cb9b8a1 ASOF JOIN (#15630) 767cb9b8a1 is described below commit 767cb9b8a1fe7386e00f11705a3430b538bd274d Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Wed May 28 05:42:46 2025 +0100 ASOF JOIN (#15630) --- pinot-common/src/main/proto/plan.proto | 4 + .../rel/rules/PinotJoinExchangeNodeInsertRule.java | 11 +- .../planner/logical/PlanNodeToRelConverter.java | 8 +- .../planner/logical/RelToPlanNodeConverter.java | 49 +++ .../pinot/query/planner/plannode/JoinNode.java | 26 +- .../query/planner/serde/PlanNodeDeserializer.java | 10 +- .../query/planner/serde/PlanNodeSerializer.java | 17 +- .../query/runtime/operator/AsofJoinOperator.java | 171 +++++++++ .../query/runtime/operator/BaseJoinOperator.java | 46 ++- .../query/runtime/operator/HashJoinOperator.java | 46 +-- .../runtime/operator/NonEquiJoinOperator.java | 44 +-- .../query/runtime/plan/PlanNodeToOpChain.java | 25 +- .../src/test/resources/queries/AsOfJoin.json | 418 +++++++++++++++++++++ 13 files changed, 776 insertions(+), 99 deletions(-) diff --git a/pinot-common/src/main/proto/plan.proto b/pinot-common/src/main/proto/plan.proto index 5e3d733e45..4e4fbb1684 100644 --- a/pinot-common/src/main/proto/plan.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -84,11 +84,14 @@ enum JoinType { FULL = 3; SEMI = 4; ANTI = 5; + ASOF = 6; + LEFT_ASOF = 7; } enum JoinStrategy { HASH = 0; LOOKUP = 1; + AS_OF = 2; } message JoinNode { @@ -97,6 +100,7 @@ message JoinNode { repeated int32 rightKeys = 3; repeated Expression nonEquiConditions = 4; JoinStrategy joinStrategy = 5; + Expression matchCondition = 6; } enum ExchangeType { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java index 5ecbedb8a5..6ce1c83767 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java @@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.logical.LogicalAsofJoin; import org.apache.calcite.tools.RelBuilderFactory; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; @@ -97,8 +98,14 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule { } // TODO: Consider creating different JOIN Rel for each join strategy - call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), newLeft, newRight, join.getJoinType(), - join.isSemiJoinDone())); + if (join instanceof LogicalAsofJoin) { + // Note that we don't use the MATCH_CONDITION in an ASOF JOIN to determine the distribution, only the join keys + // in the ON clause of the ASOF JOIN. + call.transformTo(((LogicalAsofJoin) join).copy(join.getTraitSet(), List.of(newLeft, newRight))); + } else { + call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), newLeft, newRight, join.getJoinType(), + join.isSemiJoinDone())); + } } private static PinotLogicalExchange createExchangeForLookupJoin(PinotHintOptions.DistributionType distributionType, diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java index f19e9c4e6a..9ead0c3c62 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java @@ -30,6 +30,7 @@ import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.core.Window; import org.apache.calcite.rel.logical.LogicalIntersect; @@ -154,7 +155,12 @@ public final class PlanNodeToRelConverter { conditions.add(RexExpressionUtils.toRexNode(_builder, nonEquiCondition)); } - _builder.join(node.getJoinType(), conditions); + if (node.getJoinType() == JoinRelType.ASOF || node.getJoinType() == JoinRelType.LEFT_ASOF) { + RexNode matchCondition = RexExpressionUtils.toRexNode(_builder, node.getMatchCondition()); + _builder.asofJoin(node.getJoinType(), _builder.and(conditions), matchCondition); + } else { + _builder.join(node.getJoinType(), conditions); + } } catch (RuntimeException e) { LOGGER.warn("Failed to convert join node: {}", node, e); _builder.push(new PinotExplainedRelNode(_builder.getCluster(), "UnknownJoin", Collections.emptyMap(), diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java index b0db847d5a..c9526fa488 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java @@ -37,6 +37,7 @@ import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.SetOp; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalAsofJoin; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; @@ -123,6 +124,13 @@ public final class RelToPlanNodeConverter { _joinFound = true; } result = convertLogicalJoin((LogicalJoin) node); + } else if (node instanceof LogicalAsofJoin) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1); + if (!_joinFound) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1); + _joinFound = true; + } + result = convertLogicalAsofJoin((LogicalAsofJoin) node); } else if (node instanceof LogicalWindow) { _brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1); if (!_windowFunctionFound) { @@ -358,6 +366,47 @@ public final class RelToPlanNodeConverter { joinStrategy); } + private JoinNode convertLogicalAsofJoin(LogicalAsofJoin join) { + JoinInfo joinInfo = join.analyzeCondition(); + DataSchema dataSchema = toDataSchema(join.getRowType()); + List<PlanNode> inputs = convertInputs(join.getInputs()); + JoinRelType joinType = join.getJoinType(); + + // Basic validations + Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2 inputs, got: %s", inputs.size()); + Preconditions.checkState(joinInfo.nonEquiConditions.isEmpty(), + "Non-equi conditions are not supported for ASOF join, got: %s", joinInfo.nonEquiConditions); + Preconditions.checkState(joinType == JoinRelType.ASOF || joinType == JoinRelType.LEFT_ASOF, + "Join type should be ASOF or LEFT_ASOF, got: %s", joinType); + + PlanNode left = inputs.get(0); + PlanNode right = inputs.get(1); + int numLeftColumns = left.getDataSchema().size(); + int numResultColumns = dataSchema.size(); + int numRightColumns = right.getDataSchema().size(); + Preconditions.checkState(numLeftColumns + numRightColumns == numResultColumns, + "Invalid number of columns for join type: %s, left: %s, right: %s, result: %s", joinType, numLeftColumns, + numRightColumns, numResultColumns); + + RexExpression matchCondition = RexExpressionUtils.fromRexNode(join.getMatchCondition()); + Preconditions.checkState(matchCondition != null, "ASOF_JOIN must have a match condition"); + Preconditions.checkState(matchCondition instanceof RexExpression.FunctionCall, + "ASOF JOIN only supports function call match condition, got: %s", matchCondition); + + List<RexExpression> matchKeys = ((RexExpression.FunctionCall) matchCondition).getFunctionOperands(); + // TODO: Add support for MATCH_CONDITION containing two columns of different types. In that case, there would be + // a CAST RexExpression.FunctionCall on top of the RexExpression.InputRef, and the physical ASOF join operator + // can't currently handle that. + Preconditions.checkState( + matchKeys.size() == 2 && matchKeys.get(0) instanceof RexExpression.InputRef + && matchKeys.get(1) instanceof RexExpression.InputRef, + "ASOF_JOIN only supports match conditions with a comparison between two columns of the same type"); + + return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(join.getHints()), inputs, joinType, + joinInfo.leftKeys, joinInfo.rightKeys, RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions), + JoinNode.JoinStrategy.ASOF, RexExpressionUtils.fromRexNode(join.getMatchCondition())); + } + private List<PlanNode> convertInputs(List<RelNode> inputs) { // NOTE: Inputs can be modified in place. Do not create immutable List here. int numInputs = inputs.size(); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java index c07392c298..83fc50d37f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.plannode; import java.util.List; import java.util.Objects; +import javax.annotation.Nullable; import org.apache.calcite.rel.core.JoinRelType; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; @@ -31,16 +32,25 @@ public class JoinNode extends BasePlanNode { private final List<Integer> _rightKeys; private final List<RexExpression> _nonEquiConditions; private final JoinStrategy _joinStrategy; + @Nullable + private final RexExpression _matchCondition; public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions, JoinStrategy joinStrategy) { + this(stageId, dataSchema, nodeHint, inputs, joinType, leftKeys, rightKeys, nonEquiConditions, joinStrategy, null); + } + + public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, JoinRelType joinType, + List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions, + JoinStrategy joinStrategy, RexExpression matchCondition) { super(stageId, dataSchema, nodeHint, inputs); _joinType = joinType; _leftKeys = leftKeys; _rightKeys = rightKeys; _nonEquiConditions = nonEquiConditions; _joinStrategy = joinStrategy; + _matchCondition = matchCondition; } public JoinRelType getJoinType() { @@ -63,9 +73,14 @@ public class JoinNode extends BasePlanNode { return _joinStrategy; } + @Nullable + public RexExpression getMatchCondition() { + return _matchCondition; + } + @Override public String explain() { - return "JOIN"; + return _joinStrategy == JoinStrategy.ASOF ? "ASOF JOIN" : "JOIN"; } @Override @@ -76,7 +91,7 @@ public class JoinNode extends BasePlanNode { @Override public PlanNode withInputs(List<PlanNode> inputs) { return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, _leftKeys, _rightKeys, _nonEquiConditions, - _joinStrategy); + _joinStrategy, _matchCondition); } @Override @@ -93,15 +108,16 @@ public class JoinNode extends BasePlanNode { JoinNode joinNode = (JoinNode) o; return _joinType == joinNode._joinType && Objects.equals(_leftKeys, joinNode._leftKeys) && Objects.equals( _rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions) - && _joinStrategy == joinNode._joinStrategy; + && _joinStrategy == joinNode._joinStrategy && Objects.equals(_matchCondition, joinNode._matchCondition); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy); + return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy, + _matchCondition); } public enum JoinStrategy { - HASH, LOOKUP + HASH, LOOKUP, ASOF } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java index 7ea9d0d16b..9cf5cd8000 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java @@ -102,7 +102,9 @@ public class PlanNodeDeserializer { return new JoinNode(protoNode.getStageId(), extractDataSchema(protoNode), extractNodeHint(protoNode), extractInputs(protoNode), convertJoinType(protoJoinNode.getJoinType()), protoJoinNode.getLeftKeysList(), protoJoinNode.getRightKeysList(), convertExpressions(protoJoinNode.getNonEquiConditionsList()), - convertJoinStrategy(protoJoinNode.getJoinStrategy())); + convertJoinStrategy(protoJoinNode.getJoinStrategy()), + protoJoinNode.hasMatchCondition() ? ProtoExpressionToRexExpression.convertExpression( + protoJoinNode.getMatchCondition()) : null); } private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode protoNode) { @@ -284,6 +286,10 @@ public class PlanNodeDeserializer { return JoinRelType.SEMI; case ANTI: return JoinRelType.ANTI; + case ASOF: + return JoinRelType.ASOF; + case LEFT_ASOF: + return JoinRelType.LEFT_ASOF; default: throw new IllegalStateException("Unsupported JoinType: " + joinType); } @@ -295,6 +301,8 @@ public class PlanNodeDeserializer { return JoinNode.JoinStrategy.HASH; case LOOKUP: return JoinNode.JoinStrategy.LOOKUP; + case AS_OF: + return JoinNode.JoinStrategy.ASOF; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java index bea6042d02..359b3895ee 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java @@ -116,14 +116,17 @@ public class PlanNodeSerializer { @Override public Void visitJoin(JoinNode node, Plan.PlanNode.Builder builder) { - Plan.JoinNode joinNode = Plan.JoinNode.newBuilder() + Plan.JoinNode.Builder joinNode = Plan.JoinNode.newBuilder() .setJoinType(convertJoinType(node.getJoinType())) .addAllLeftKeys(node.getLeftKeys()) .addAllRightKeys(node.getRightKeys()) .addAllNonEquiConditions(convertExpressions(node.getNonEquiConditions())) - .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy())) - .build(); - builder.setJoinNode(joinNode); + .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy())); + + if (node.getMatchCondition() != null) { + joinNode.setMatchCondition(RexExpressionToProtoExpression.convertExpression(node.getMatchCondition())); + } + builder.setJoinNode(joinNode.build()); return null; } @@ -289,6 +292,10 @@ public class PlanNodeSerializer { return Plan.JoinType.SEMI; case ANTI: return Plan.JoinType.ANTI; + case ASOF: + return Plan.JoinType.ASOF; + case LEFT_ASOF: + return Plan.JoinType.LEFT_ASOF; default: throw new IllegalStateException("Unsupported JoinRelType: " + joinType); } @@ -300,6 +307,8 @@ public class PlanNodeSerializer { return Plan.JoinStrategy.HASH; case LOOKUP: return Plan.JoinStrategy.LOOKUP; + case ASOF: + return Plan.JoinStrategy.AS_OF; default: throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java new file mode 100644 index 0000000000..5c98a9ce29 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.planner.logical.RexExpression; +import org.apache.pinot.query.planner.partitioning.KeySelector; +import org.apache.pinot.query.planner.partitioning.KeySelectorFactory; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.runtime.blocks.MseBlock; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; + + +public class AsofJoinOperator extends BaseJoinOperator { + private static final String EXPLAIN_NAME = "ASOF_JOIN"; + + // The right table is a map from the hash key (columns in the ON join condition) to a sorted map of match key + // (column in the MATCH_CONDITION) to rows. + private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable; + private final KeySelector<?> _leftKeySelector; + private final KeySelector<?> _rightKeySelector; + private final MatchConditionType _matchConditionType; + private final int _leftMatchKeyIndex; + private final int _rightMatchKeyIndex; + + public AsofJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema, + MultiStageOperator rightInput, JoinNode node) { + super(context, leftInput, leftSchema, rightInput, node); + _rightTable = new HashMap<>(); + _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys()); + _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys()); + + RexExpression matchCondition = node.getMatchCondition(); + try { + _matchConditionType = + MatchConditionType.valueOf(((RexExpression.FunctionCall) matchCondition).getFunctionName().toUpperCase()); + } catch (IllegalArgumentException e) { + throw new UnsupportedOperationException("Unsupported match condition: " + matchCondition); + } + + List<RexExpression> matchKeys = ((RexExpression.FunctionCall) matchCondition).getFunctionOperands(); + _leftMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(0)).getIndex(); + _rightMatchKeyIndex = ((RexExpression.InputRef) matchKeys.get(1)).getIndex() - leftSchema.size(); + } + + @Override + protected void addRowsToRightTable(List<Object[]> rows) { + for (Object[] row : rows) { + Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex]; + if (matchKey == null) { + // Skip rows with null match keys because they cannot be matched with any left rows + continue; + } + Object hashKey = _rightKeySelector.getKey(row); + // Results need not be deterministic if there are "ties" based on the match key in an ASOF JOIN, so it's okay to + // only keep the last row with the same hash key and match key. + _rightTable.computeIfAbsent(hashKey, k -> new TreeMap<>()).put(matchKey, row); + } + } + + @Override + protected void finishBuildingRightTable() { + // no-op + } + + @Override + protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) { + List<Object[]> rows = new ArrayList<>(); + for (Object[] leftRow : leftBlock.asRowHeap().getRows()) { + Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex]; + if (matchKey == null) { + // Rows with null match keys cannot be matched with any right rows + if (needUnmatchedLeftRows()) { + rows.add(joinRow(leftRow, null)); + } + continue; + } + Object hashKey = _leftKeySelector.getKey(leftRow); + NavigableMap<Comparable<?>, Object[]> rightRows = _rightTable.get(hashKey); + if (rightRows == null) { + if (needUnmatchedLeftRows()) { + rows.add(joinRow(leftRow, null)); + } + } else { + Object[] rightRow = closestMatch(matchKey, rightRows); + if (rightRow == null) { + if (needUnmatchedLeftRows()) { + rows.add(joinRow(leftRow, null)); + } + } else { + rows.add(joinRow(leftRow, rightRow)); + } + } + } + return rows; + } + + @Nullable + private Object[] closestMatch(Comparable<?> matchKey, NavigableMap<Comparable<?>, Object[]> rightRows) { + switch (_matchConditionType) { + case GREATER_THAN: { + // Find the closest right row that is less than the left row (compared by their match keys from the match + // condition). + Map.Entry<Comparable<?>, Object[]> closestMatch = rightRows.lowerEntry(matchKey); + return closestMatch == null ? null : closestMatch.getValue(); + } + case GREATER_THAN_OR_EQUAL: { + // Find the closest right row that is less than or equal to the left row (compared by their match keys from + // the match condition). + Map.Entry<Comparable<?>, Object[]> closestMatch = rightRows.floorEntry(matchKey); + return closestMatch == null ? null : closestMatch.getValue(); + } + case LESS_THAN: { + // Find the closest right row that is greater than the left row (compared by their match keys from the match + // condition). + Map.Entry<Comparable<?>, Object[]> closestMatch = rightRows.higherEntry(matchKey); + return closestMatch == null ? null : closestMatch.getValue(); + } + case LESS_THAN_OR_EQUAL: { + // Find the closest right row that is greater than or equal to the left row (compared by their match keys from + // the match condition). + Map.Entry<Comparable<?>, Object[]> closestMatch = rightRows.ceilingEntry(matchKey); + return closestMatch == null ? null : closestMatch.getValue(); + } + default: + throw new IllegalArgumentException("Unsupported match condition type: " + _matchConditionType); + } + } + + @Override + protected List<Object[]> buildNonMatchRightRows() { + // There's only ASOF JOIN and LEFT ASOF JOIN; RIGHT ASOF JOIN is not a thing + throw new UnsupportedOperationException("ASOF JOIN does not support unmatched right rows"); + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + private enum MatchConditionType { + GREATER_THAN, + GREATER_THAN_OR_EQUAL, + LESS_THAN, + LESS_THAN_OR_EQUAL + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java index 8fee3d0e9e..5040bae620 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java @@ -172,7 +172,49 @@ public abstract class BaseJoinOperator extends MultiStageOperator { return mseBlock; } - protected abstract void buildRightTable(); + protected void buildRightTable() { + LOGGER.trace("Building right table for join operator"); + long startTime = System.currentTimeMillis(); + int numRows = 0; + MseBlock rightBlock = _rightInput.nextBlock(); + while (rightBlock.isData()) { + List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows(); + // Row based overflow check. + if (rows.size() + numRows > _maxRowsInJoin) { + if (_joinOverflowMode == JoinOverFlowMode.THROW) { + throwForJoinRowLimitExceeded( + "Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin); + } else { + // Just fill up the buffer. + int remainingRows = _maxRowsInJoin - numRows; + rows = rows.subList(0, remainingRows); + _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); + // setting only the rightTableOperator to be early terminated and awaits EOS block next. + _rightInput.earlyTerminate(); + } + } + + addRowsToRightTable(rows); + numRows += rows.size(); + sampleAndCheckInterruption(); + rightBlock = _rightInput.nextBlock(); + } + + MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock; + if (eosBlock.isError()) { + _eos = eosBlock; + } else { + _isRightTableBuilt = true; + finishBuildingRightTable(); + } + + _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); + LOGGER.trace("Finished building right table for join operator"); + } + + protected abstract void addRowsToRightTable(List<Object[]> rows); + + protected abstract void finishBuildingRightTable(); protected MseBlock buildJoinedDataBlock() { LOGGER.trace("Building joined data block for join operator"); @@ -240,7 +282,7 @@ public abstract class BaseJoinOperator extends MultiStageOperator { } protected boolean needUnmatchedLeftRows() { - return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL; + return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL || _joinType == JoinRelType.LEFT_ASOF; } protected void earlyTerminateLeftInput() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 5d4294546c..0cb1032367 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -36,7 +36,6 @@ import org.apache.pinot.query.runtime.operator.join.LongLookupTable; import org.apache.pinot.query.runtime.operator.join.LookupTable; import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode; /** @@ -93,44 +92,15 @@ public class HashJoinOperator extends BaseJoinOperator { } @Override - protected void buildRightTable() { - LOGGER.trace("Building hash table for join operator"); - long startTime = System.currentTimeMillis(); - int numRows = 0; - MseBlock rightBlock = _rightInput.nextBlock(); - while (rightBlock.isData()) { - MseBlock.Data dataBlock = (MseBlock.Data) rightBlock; - List<Object[]> rows = dataBlock.asRowHeap().getRows(); - // Row based overflow check. - if (rows.size() + numRows > _maxRowsInJoin) { - if (_joinOverflowMode == JoinOverFlowMode.THROW) { - throwForJoinRowLimitExceeded( - "Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin); - } else { - // Just fill up the buffer. - int remainingRows = _maxRowsInJoin - numRows; - rows = rows.subList(0, remainingRows); - _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); - // setting only the rightTableOperator to be early terminated and awaits EOS block next. - _rightInput.earlyTerminate(); - } - } - for (Object[] row : rows) { - _rightTable.addRow(_rightKeySelector.getKey(row), row); - } - numRows += rows.size(); - sampleAndCheckInterruption(); - rightBlock = _rightInput.nextBlock(); + protected void addRowsToRightTable(List<Object[]> rows) { + for (Object[] row : rows) { + _rightTable.addRow(_rightKeySelector.getKey(row), row); } - MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock; - if (eosBlock.isError()) { - _eos = eosBlock; - } else { - _rightTable.finish(); - _isRightTableBuilt = true; - } - _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); - LOGGER.trace("Finished building hash table for join operator"); + } + + @Override + protected void finishBuildingRightTable() { + _rightTable.finish(); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java index 9ffcad83c1..34c3e99287 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java @@ -27,7 +27,6 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.runtime.blocks.MseBlock; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode; /** @@ -57,42 +56,15 @@ public class NonEquiJoinOperator extends BaseJoinOperator { } @Override - protected void buildRightTable() { - LOGGER.trace("Building right table for join operator"); - long startTime = System.currentTimeMillis(); - MseBlock rightBlock = _rightInput.nextBlock(); - while (rightBlock.isData()) { - List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows(); - int numRowsInRightTable = _rightTable.size(); - // Row based overflow check. - if (rows.size() + numRowsInRightTable > _maxRowsInJoin) { - if (_joinOverflowMode == JoinOverFlowMode.THROW) { - throwForJoinRowLimitExceeded( - "Cannot build in memory right table for join operator, reached number of rows limit: " + _maxRowsInJoin); - } else { - // Just fill up the buffer. - int remainingRows = _maxRowsInJoin - numRowsInRightTable; - rows = rows.subList(0, remainingRows); - _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true); - // setting only the rightTableOperator to be early terminated and awaits EOS block next. - _rightInput.earlyTerminate(); - } - } - _rightTable.addAll(rows); - sampleAndCheckInterruption(); - rightBlock = _rightInput.nextBlock(); - } - MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock; - if (eosBlock.isError()) { - _eos = eosBlock; - } else { - _isRightTableBuilt = true; - if (needUnmatchedRightRows()) { - _matchedRightRows = new BitSet(_rightTable.size()); - } + protected void addRowsToRightTable(List<Object[]> rows) { + _rightTable.addAll(rows); + } + + @Override + protected void finishBuildingRightTable() { + if (needUnmatchedRightRows()) { + _matchedRightRows = new BitSet(_rightTable.size()); } - _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime); - LOGGER.trace("Finished building right table for join operator"); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java index 5dd8be81d3..c08cb89359 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java @@ -37,6 +37,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode; import org.apache.pinot.query.planner.plannode.ValueNode; import org.apache.pinot.query.planner.plannode.WindowNode; import org.apache.pinot.query.runtime.operator.AggregateOperator; +import org.apache.pinot.query.runtime.operator.AsofJoinOperator; import org.apache.pinot.query.runtime.operator.FilterOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.IntersectAllOperator; @@ -180,16 +181,20 @@ public class PlanNodeToOpChain { PlanNode right = inputs.get(1); MultiStageOperator rightOperator = visit(right, context); JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy(); - if (joinStrategy == JoinNode.JoinStrategy.HASH) { - if (node.getLeftKeys().isEmpty()) { - // TODO: Consider adding non-equi as a separate join strategy. - return new NonEquiJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); - } else { - return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); - } - } else { - assert joinStrategy == JoinNode.JoinStrategy.LOOKUP; - return new LookupJoinOperator(context, leftOperator, rightOperator, node); + switch (joinStrategy) { + case HASH: + if (node.getLeftKeys().isEmpty()) { + // TODO: Consider adding non-equi as a separate join strategy. + return new NonEquiJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); + } else { + return new HashJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); + } + case LOOKUP: + return new LookupJoinOperator(context, leftOperator, rightOperator, node); + case ASOF: + return new AsofJoinOperator(context, leftOperator, left.getDataSchema(), rightOperator, node); + default: + throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy); } } diff --git a/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json new file mode 100644 index 0000000000..a2d9686e44 --- /dev/null +++ b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json @@ -0,0 +1,418 @@ +{ + "as_of_join_queries": { + "tables": { + "t1": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["a", 1], + ["b", 2], + ["c", 3], + ["d", 4], + ["e", 5] + ] + }, + "t2": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["b", 2], + ["a", 1], + ["c", 3], + ["a", 2], + ["c", 1], + ["b", 3], + ["d", 5] + ] + } + }, + "queries": [ + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["c", 3, "c", 1] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["b", 2, "b", 3], + ["d", 4, "d", 5] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3], + ["d", 4, "d", 5] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, null, null], + ["b", 2, null, null], + ["c", 3, "c", 1], + ["d", 4, null, null], + ["e", 5, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3], + ["d", 4, null, null], + ["e", 5, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["b", 2, "b", 3], + ["c", 3, null, null], + ["d", 4, "d", 5], + ["e", 5, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3], + ["d", 4, "d", 5], + ["e", 5, null, null] + ] + } + ] + }, + "as_of_join_queries_without_hash_key_join": { + "tables": { + "t1": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["a", 1], + ["b", 2], + ["c", 3], + ["d", 4], + ["e", 5] + ] + }, + "t2": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["b", 2], + ["a", 1], + ["c", 3], + ["a", 4], + ["c", 7], + ["b", 6], + ["d", 5] + ] + } + }, + "queries": [ + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON true", + "outputs": [ + ["b", 2, "a", 1], + ["c", 3, "b", 2], + ["d", 4, "c", 3], + ["e", 5, "a", 4] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON true", + "outputs": [ + ["a", 1, null, null], + ["b", 2, "a", 1], + ["c", 3, "b", 2], + ["d", 4, "c", 3], + ["e", 5, "a", 4] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= {t2}.asof_col) ON true", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3], + ["d", 4, "a", 4], + ["e", 5, "d", 5] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < {t2}.asof_col) ON true", + "outputs": [ + ["a", 1, "b", 2], + ["b", 2, "c", 3], + ["c", 3, "a", 4], + ["d", 4, "d", 5], + ["e", 5, "b", 6] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= {t2}.asof_col) ON true", + "outputs": [ + ["a", 1, "a", 1], + ["b", 2, "b", 2], + ["c", 3, "c", 3], + ["d", 4, "a", 4], + ["e", 5, "d", 5] + ] + } + ] + }, + "as_of_join_queries_with_nulls": { + "tables": { + "t1": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["a", 1], + ["a", 5], + ["b", 3], + ["c", null], + ["d", 4], + ["e", 7], + ["f", 10], + ["g", 12] + ] + }, + "t2": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["a", 0], + ["a", 2], + ["a", null], + ["b", 2], + ["b", null], + ["c", 5], + ["d", 4], + ["d", 6], + ["f", null], + ["f", 11], + ["g", null], + ["h", 9] + ] + } + }, + "queries": [ + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 0], + ["a", 5, "a", 2], + ["b", 3, "b", 2] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 0], + ["a", 5, "a", 2], + ["b", 3, "b", 2], + ["d", 4, "d", 4] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["d", 4, "d", 6], + ["f", 10, "f", 11] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["d", 4, "d", 4], + ["f", 10, "f", 11] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 0], + ["a", 5, "a", 2], + ["b", 3, "b", 2], + ["c", null, null, null], + ["d", 4, null, null], + ["e", 7, null, null], + ["f", 10, null, null], + ["g", 12, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 0], + ["a", 5, "a", 2], + ["b", 3, "b", 2], + ["c", null, null, null], + ["d", 4, "d", 4], + ["e", 7, null, null], + ["f", 10, null, null], + ["g", 12, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["a", 5, null, null], + ["b", 3, null, null], + ["c", null, null, null], + ["d", 4, "d", 6], + ["e", 7, null, null], + ["f", 10, "f", 11], + ["g", 12, null, null] + ] + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "outputs": [ + ["a", 1, "a", 2], + ["a", 5, null, null], + ["b", 3, null, null], + ["c", null, null, null], + ["d", 4, "d", 4], + ["e", 7, null, null], + ["f", 10, "f", 11], + ["g", 12, null, null] + ] + } + ] + }, + "as_of_join_unsupported_scenarios": { + "tables": { + "t1": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["a", 1], + ["b", 2], + ["c", 3], + ["d", 4], + ["e", 5] + ] + }, + "t2": { + "schema": [ + {"name": "key_col", "type": "STRING"}, + {"name": "asof_col", "type": "INT"} + ], + "inputs": [ + ["b", 2], + ["a", 1], + ["c", 3], + ["a", 2], + ["c", 1], + ["b", 3], + ["d", 5] + ] + } + }, + "queries": [ + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col)", + "expectedException": ".*exception while parsing query.*", + "comment": "Calcite currently doesn't support ASOF JOINs without an ON clause. This isn't just a parser limitation, since the assumption is also built into the validator." + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col)", + "expectedException": ".*exception while parsing query.*", + "comment": "Calcite currently doesn't support ASOF JOINs without an ON clause. This isn't just a parser limitation, since the assumption is also built into the validator." + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col AND {t1}.asof_col > 0", + "expectedException": ".*ASOF JOIN condition must be a conjunction of equality comparisons.*" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col != {t2}.key_col", + "expectedException": ".*ASOF JOIN condition must be a conjunction of equality comparisons.*" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col) ON {t1}.key_col = {t2}.key_col OR {t1}.asof_col = {t2}.asof_col", + "expectedException": ".*ASOF JOIN condition must be a conjunction of equality comparisons.*" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*exception while parsing query.*", + "comment": "MATCH_CONDITION is required for ASOF JOINs" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*exception while parsing query.*", + "comment": "MATCH_CONDITION is required for ASOF JOINs" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col != {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col = {t2}.asof_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col OR {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.asof_col AND {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 0) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison between columns from the two inputs.*", + "comment": "MATCH_CONDITION only supports a single predicate comparing two columns that is one out of: (>, >=, <, <=)" + }, + { + "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, {t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > {t2}.key_col) ON {t1}.key_col = {t2}.key_col", + "expectedException": ".*ASOF_JOIN only supports match conditions with a comparison between two columns of the same type.*", + "comment": "We currently don't support MATCH_CONDITION comparing columns of different types" + } + ] + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org