This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 767cb9b8a1 ASOF JOIN (#15630)
767cb9b8a1 is described below

commit 767cb9b8a1fe7386e00f11705a3430b538bd274d
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Wed May 28 05:42:46 2025 +0100

    ASOF JOIN (#15630)
---
 pinot-common/src/main/proto/plan.proto             |   4 +
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java |  11 +-
 .../planner/logical/PlanNodeToRelConverter.java    |   8 +-
 .../planner/logical/RelToPlanNodeConverter.java    |  49 +++
 .../pinot/query/planner/plannode/JoinNode.java     |  26 +-
 .../query/planner/serde/PlanNodeDeserializer.java  |  10 +-
 .../query/planner/serde/PlanNodeSerializer.java    |  17 +-
 .../query/runtime/operator/AsofJoinOperator.java   | 171 +++++++++
 .../query/runtime/operator/BaseJoinOperator.java   |  46 ++-
 .../query/runtime/operator/HashJoinOperator.java   |  46 +--
 .../runtime/operator/NonEquiJoinOperator.java      |  44 +--
 .../query/runtime/plan/PlanNodeToOpChain.java      |  25 +-
 .../src/test/resources/queries/AsOfJoin.json       | 418 +++++++++++++++++++++
 13 files changed, 776 insertions(+), 99 deletions(-)

diff --git a/pinot-common/src/main/proto/plan.proto 
b/pinot-common/src/main/proto/plan.proto
index 5e3d733e45..4e4fbb1684 100644
--- a/pinot-common/src/main/proto/plan.proto
+++ b/pinot-common/src/main/proto/plan.proto
@@ -84,11 +84,14 @@ enum JoinType {
   FULL = 3;
   SEMI = 4;
   ANTI = 5;
+  ASOF = 6;
+  LEFT_ASOF = 7;
 }
 
 enum JoinStrategy {
   HASH = 0;
   LOOKUP = 1;
+  AS_OF = 2;
 }
 
 message JoinNode {
@@ -97,6 +100,7 @@ message JoinNode {
   repeated int32 rightKeys = 3;
   repeated Expression nonEquiConditions = 4;
   JoinStrategy joinStrategy = 5;
+  Expression matchCondition = 6;
 }
 
 enum ExchangeType {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 5ecbedb8a5..6ce1c83767 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
@@ -97,8 +98,14 @@ public class PinotJoinExchangeNodeInsertRule extends 
RelOptRule {
     }
 
     // TODO: Consider creating different JOIN Rel for each join strategy
-    call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), 
newLeft, newRight, join.getJoinType(),
-        join.isSemiJoinDone()));
+    if (join instanceof LogicalAsofJoin) {
+      // Note that we don't use the MATCH_CONDITION in an ASOF JOIN to 
determine the distribution, only the join keys
+      // in the ON clause of the ASOF JOIN.
+      call.transformTo(((LogicalAsofJoin) join).copy(join.getTraitSet(), 
List.of(newLeft, newRight)));
+    } else {
+      call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), 
newLeft, newRight, join.getJoinType(),
+          join.isSemiJoinDone()));
+    }
   }
 
   private static PinotLogicalExchange 
createExchangeForLookupJoin(PinotHintOptions.DistributionType distributionType,
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
index f19e9c4e6a..9ead0c3c62 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PlanNodeToRelConverter.java
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.logical.LogicalIntersect;
@@ -154,7 +155,12 @@ public final class PlanNodeToRelConverter {
           conditions.add(RexExpressionUtils.toRexNode(_builder, 
nonEquiCondition));
         }
 
-        _builder.join(node.getJoinType(), conditions);
+        if (node.getJoinType() == JoinRelType.ASOF || node.getJoinType() == 
JoinRelType.LEFT_ASOF) {
+          RexNode matchCondition = RexExpressionUtils.toRexNode(_builder, 
node.getMatchCondition());
+          _builder.asofJoin(node.getJoinType(), _builder.and(conditions), 
matchCondition);
+        } else {
+          _builder.join(node.getJoinType(), conditions);
+        }
       } catch (RuntimeException e) {
         LOGGER.warn("Failed to convert join node: {}", node, e);
         _builder.push(new PinotExplainedRelNode(_builder.getCluster(), 
"UnknownJoin", Collections.emptyMap(),
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index b0db847d5a..c9526fa488 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -37,6 +37,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -123,6 +124,13 @@ public final class RelToPlanNodeConverter {
         _joinFound = true;
       }
       result = convertLogicalJoin((LogicalJoin) node);
+    } else if (node instanceof LogicalAsofJoin) {
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1);
+      if (!_joinFound) {
+        _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 
1);
+        _joinFound = true;
+      }
+      result = convertLogicalAsofJoin((LogicalAsofJoin) node);
     } else if (node instanceof LogicalWindow) {
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1);
       if (!_windowFunctionFound) {
@@ -358,6 +366,47 @@ public final class RelToPlanNodeConverter {
         joinStrategy);
   }
 
+  private JoinNode convertLogicalAsofJoin(LogicalAsofJoin join) {
+    JoinInfo joinInfo = join.analyzeCondition();
+    DataSchema dataSchema = toDataSchema(join.getRowType());
+    List<PlanNode> inputs = convertInputs(join.getInputs());
+    JoinRelType joinType = join.getJoinType();
+
+    // Basic validations
+    Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2 
inputs, got: %s", inputs.size());
+    Preconditions.checkState(joinInfo.nonEquiConditions.isEmpty(),
+        "Non-equi conditions are not supported for ASOF join, got: %s", 
joinInfo.nonEquiConditions);
+    Preconditions.checkState(joinType == JoinRelType.ASOF || joinType == 
JoinRelType.LEFT_ASOF,
+        "Join type should be ASOF or LEFT_ASOF, got: %s", joinType);
+
+    PlanNode left = inputs.get(0);
+    PlanNode right = inputs.get(1);
+    int numLeftColumns = left.getDataSchema().size();
+    int numResultColumns = dataSchema.size();
+    int numRightColumns = right.getDataSchema().size();
+    Preconditions.checkState(numLeftColumns + numRightColumns == 
numResultColumns,
+        "Invalid number of columns for join type: %s, left: %s, right: %s, 
result: %s", joinType, numLeftColumns,
+        numRightColumns, numResultColumns);
+
+    RexExpression matchCondition = 
RexExpressionUtils.fromRexNode(join.getMatchCondition());
+    Preconditions.checkState(matchCondition != null, "ASOF_JOIN must have a 
match condition");
+    Preconditions.checkState(matchCondition instanceof 
RexExpression.FunctionCall,
+        "ASOF JOIN only supports function call match condition, got: %s", 
matchCondition);
+
+    List<RexExpression> matchKeys = ((RexExpression.FunctionCall) 
matchCondition).getFunctionOperands();
+    // TODO: Add support for MATCH_CONDITION containing two columns of 
different types. In that case, there would be
+    //       a CAST RexExpression.FunctionCall on top of the 
RexExpression.InputRef, and the physical ASOF join operator
+    //       can't currently handle that.
+    Preconditions.checkState(
+        matchKeys.size() == 2 && matchKeys.get(0) instanceof 
RexExpression.InputRef
+            && matchKeys.get(1) instanceof RexExpression.InputRef,
+        "ASOF_JOIN only supports match conditions with a comparison between 
two columns of the same type");
+
+    return new JoinNode(DEFAULT_STAGE_ID, dataSchema, 
NodeHint.fromRelHints(join.getHints()), inputs, joinType,
+        joinInfo.leftKeys, joinInfo.rightKeys, 
RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions),
+        JoinNode.JoinStrategy.ASOF, 
RexExpressionUtils.fromRexNode(join.getMatchCondition()));
+  }
+
   private List<PlanNode> convertInputs(List<RelNode> inputs) {
     // NOTE: Inputs can be modified in place. Do not create immutable List 
here.
     int numInputs = inputs.size();
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index c07392c298..83fc50d37f 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.planner.plannode;
 
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -31,16 +32,25 @@ public class JoinNode extends BasePlanNode {
   private final List<Integer> _rightKeys;
   private final List<RexExpression> _nonEquiConditions;
   private final JoinStrategy _joinStrategy;
+  @Nullable
+  private final RexExpression _matchCondition;
 
   public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, 
List<PlanNode> inputs, JoinRelType joinType,
       List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> 
nonEquiConditions,
       JoinStrategy joinStrategy) {
+    this(stageId, dataSchema, nodeHint, inputs, joinType, leftKeys, rightKeys, 
nonEquiConditions, joinStrategy, null);
+  }
+
+  public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, 
List<PlanNode> inputs, JoinRelType joinType,
+      List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> 
nonEquiConditions,
+      JoinStrategy joinStrategy, RexExpression matchCondition) {
     super(stageId, dataSchema, nodeHint, inputs);
     _joinType = joinType;
     _leftKeys = leftKeys;
     _rightKeys = rightKeys;
     _nonEquiConditions = nonEquiConditions;
     _joinStrategy = joinStrategy;
+    _matchCondition = matchCondition;
   }
 
   public JoinRelType getJoinType() {
@@ -63,9 +73,14 @@ public class JoinNode extends BasePlanNode {
     return _joinStrategy;
   }
 
+  @Nullable
+  public RexExpression getMatchCondition() {
+    return _matchCondition;
+  }
+
   @Override
   public String explain() {
-    return "JOIN";
+    return _joinStrategy == JoinStrategy.ASOF ? "ASOF JOIN" : "JOIN";
   }
 
   @Override
@@ -76,7 +91,7 @@ public class JoinNode extends BasePlanNode {
   @Override
   public PlanNode withInputs(List<PlanNode> inputs) {
     return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, 
_leftKeys, _rightKeys, _nonEquiConditions,
-        _joinStrategy);
+        _joinStrategy, _matchCondition);
   }
 
   @Override
@@ -93,15 +108,16 @@ public class JoinNode extends BasePlanNode {
     JoinNode joinNode = (JoinNode) o;
     return _joinType == joinNode._joinType && Objects.equals(_leftKeys, 
joinNode._leftKeys) && Objects.equals(
         _rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, 
joinNode._nonEquiConditions)
-        && _joinStrategy == joinNode._joinStrategy;
+        && _joinStrategy == joinNode._joinStrategy && 
Objects.equals(_matchCondition, joinNode._matchCondition);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, 
_nonEquiConditions, _joinStrategy);
+    return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, 
_nonEquiConditions, _joinStrategy,
+        _matchCondition);
   }
 
   public enum JoinStrategy {
-    HASH, LOOKUP
+    HASH, LOOKUP, ASOF
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
index 7ea9d0d16b..9cf5cd8000 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeDeserializer.java
@@ -102,7 +102,9 @@ public class PlanNodeDeserializer {
     return new JoinNode(protoNode.getStageId(), extractDataSchema(protoNode), 
extractNodeHint(protoNode),
         extractInputs(protoNode), 
convertJoinType(protoJoinNode.getJoinType()), protoJoinNode.getLeftKeysList(),
         protoJoinNode.getRightKeysList(), 
convertExpressions(protoJoinNode.getNonEquiConditionsList()),
-        convertJoinStrategy(protoJoinNode.getJoinStrategy()));
+        convertJoinStrategy(protoJoinNode.getJoinStrategy()),
+        protoJoinNode.hasMatchCondition() ? 
ProtoExpressionToRexExpression.convertExpression(
+            protoJoinNode.getMatchCondition()) : null);
   }
 
   private static MailboxReceiveNode 
deserializeMailboxReceiveNode(Plan.PlanNode protoNode) {
@@ -284,6 +286,10 @@ public class PlanNodeDeserializer {
         return JoinRelType.SEMI;
       case ANTI:
         return JoinRelType.ANTI;
+      case ASOF:
+        return JoinRelType.ASOF;
+      case LEFT_ASOF:
+        return JoinRelType.LEFT_ASOF;
       default:
         throw new IllegalStateException("Unsupported JoinType: " + joinType);
     }
@@ -295,6 +301,8 @@ public class PlanNodeDeserializer {
         return JoinNode.JoinStrategy.HASH;
       case LOOKUP:
         return JoinNode.JoinStrategy.LOOKUP;
+      case AS_OF:
+        return JoinNode.JoinStrategy.ASOF;
       default:
         throw new IllegalStateException("Unsupported JoinStrategy: " + 
joinStrategy);
     }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
index bea6042d02..359b3895ee 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/serde/PlanNodeSerializer.java
@@ -116,14 +116,17 @@ public class PlanNodeSerializer {
 
     @Override
     public Void visitJoin(JoinNode node, Plan.PlanNode.Builder builder) {
-      Plan.JoinNode joinNode = Plan.JoinNode.newBuilder()
+      Plan.JoinNode.Builder joinNode = Plan.JoinNode.newBuilder()
           .setJoinType(convertJoinType(node.getJoinType()))
           .addAllLeftKeys(node.getLeftKeys())
           .addAllRightKeys(node.getRightKeys())
           
.addAllNonEquiConditions(convertExpressions(node.getNonEquiConditions()))
-          .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy()))
-          .build();
-      builder.setJoinNode(joinNode);
+          .setJoinStrategy(convertJoinStrategy(node.getJoinStrategy()));
+
+      if (node.getMatchCondition() != null) {
+        
joinNode.setMatchCondition(RexExpressionToProtoExpression.convertExpression(node.getMatchCondition()));
+      }
+      builder.setJoinNode(joinNode.build());
       return null;
     }
 
@@ -289,6 +292,10 @@ public class PlanNodeSerializer {
           return Plan.JoinType.SEMI;
         case ANTI:
           return Plan.JoinType.ANTI;
+        case ASOF:
+          return Plan.JoinType.ASOF;
+        case LEFT_ASOF:
+          return Plan.JoinType.LEFT_ASOF;
         default:
           throw new IllegalStateException("Unsupported JoinRelType: " + 
joinType);
       }
@@ -300,6 +307,8 @@ public class PlanNodeSerializer {
           return Plan.JoinStrategy.HASH;
         case LOOKUP:
           return Plan.JoinStrategy.LOOKUP;
+        case ASOF:
+          return Plan.JoinStrategy.AS_OF;
         default:
           throw new IllegalStateException("Unsupported JoinStrategy: " + 
joinStrategy);
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
new file mode 100644
index 0000000000..5c98a9ce29
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.runtime.blocks.MseBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class AsofJoinOperator extends BaseJoinOperator {
+  private static final String EXPLAIN_NAME = "ASOF_JOIN";
+
+  // The right table is a map from the hash key (columns in the ON join 
condition) to a sorted map of match key
+  // (column in the MATCH_CONDITION) to rows.
+  private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
+  private final KeySelector<?> _leftKeySelector;
+  private final KeySelector<?> _rightKeySelector;
+  private final MatchConditionType _matchConditionType;
+  private final int _leftMatchKeyIndex;
+  private final int _rightMatchKeyIndex;
+
+  public AsofJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftInput, DataSchema leftSchema,
+      MultiStageOperator rightInput, JoinNode node) {
+    super(context, leftInput, leftSchema, rightInput, node);
+    _rightTable = new HashMap<>();
+    _leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
+    _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
+
+    RexExpression matchCondition = node.getMatchCondition();
+    try {
+      _matchConditionType =
+          MatchConditionType.valueOf(((RexExpression.FunctionCall) 
matchCondition).getFunctionName().toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new UnsupportedOperationException("Unsupported match condition: " 
+ matchCondition);
+    }
+
+    List<RexExpression> matchKeys = ((RexExpression.FunctionCall) 
matchCondition).getFunctionOperands();
+    _leftMatchKeyIndex = ((RexExpression.InputRef) 
matchKeys.get(0)).getIndex();
+    _rightMatchKeyIndex = ((RexExpression.InputRef) 
matchKeys.get(1)).getIndex() - leftSchema.size();
+  }
+
+  @Override
+  protected void addRowsToRightTable(List<Object[]> rows) {
+    for (Object[] row : rows) {
+      Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex];
+      if (matchKey == null) {
+        // Skip rows with null match keys because they cannot be matched with 
any left rows
+        continue;
+      }
+      Object hashKey = _rightKeySelector.getKey(row);
+      // Results need not be deterministic if there are "ties" based on the 
match key in an ASOF JOIN, so it's okay to
+      // only keep the last row with the same hash key and match key.
+      _rightTable.computeIfAbsent(hashKey, k -> new TreeMap<>()).put(matchKey, 
row);
+    }
+  }
+
+  @Override
+  protected void finishBuildingRightTable() {
+    // no-op
+  }
+
+  @Override
+  protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+    List<Object[]> rows = new ArrayList<>();
+    for (Object[] leftRow : leftBlock.asRowHeap().getRows()) {
+      Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex];
+      if (matchKey == null) {
+        // Rows with null match keys cannot be matched with any right rows
+        if (needUnmatchedLeftRows()) {
+          rows.add(joinRow(leftRow, null));
+        }
+        continue;
+      }
+      Object hashKey = _leftKeySelector.getKey(leftRow);
+      NavigableMap<Comparable<?>, Object[]> rightRows = 
_rightTable.get(hashKey);
+      if (rightRows == null) {
+        if (needUnmatchedLeftRows()) {
+          rows.add(joinRow(leftRow, null));
+        }
+      } else {
+        Object[] rightRow = closestMatch(matchKey, rightRows);
+        if (rightRow == null) {
+          if (needUnmatchedLeftRows()) {
+            rows.add(joinRow(leftRow, null));
+          }
+        } else {
+          rows.add(joinRow(leftRow, rightRow));
+        }
+      }
+    }
+    return rows;
+  }
+
+  @Nullable
+  private Object[] closestMatch(Comparable<?> matchKey, 
NavigableMap<Comparable<?>, Object[]> rightRows) {
+    switch (_matchConditionType) {
+      case GREATER_THAN: {
+        // Find the closest right row that is less than the left row (compared 
by their match keys from the match
+        // condition).
+        Map.Entry<Comparable<?>, Object[]> closestMatch = 
rightRows.lowerEntry(matchKey);
+        return closestMatch == null ? null : closestMatch.getValue();
+      }
+      case GREATER_THAN_OR_EQUAL: {
+        // Find the closest right row that is less than or equal to the left 
row (compared by their match keys from
+        // the match condition).
+        Map.Entry<Comparable<?>, Object[]> closestMatch = 
rightRows.floorEntry(matchKey);
+        return closestMatch == null ? null : closestMatch.getValue();
+      }
+      case LESS_THAN: {
+        // Find the closest right row that is greater than the left row 
(compared by their match keys from the match
+        // condition).
+        Map.Entry<Comparable<?>, Object[]> closestMatch = 
rightRows.higherEntry(matchKey);
+        return closestMatch == null ? null : closestMatch.getValue();
+      }
+      case LESS_THAN_OR_EQUAL: {
+        // Find the closest right row that is greater than or equal to the 
left row (compared by their match keys from
+        // the match condition).
+        Map.Entry<Comparable<?>, Object[]> closestMatch = 
rightRows.ceilingEntry(matchKey);
+        return closestMatch == null ? null : closestMatch.getValue();
+      }
+      default:
+        throw new IllegalArgumentException("Unsupported match condition type: 
" + _matchConditionType);
+    }
+  }
+
+  @Override
+  protected List<Object[]> buildNonMatchRightRows() {
+    // There's only ASOF JOIN and LEFT ASOF JOIN; RIGHT ASOF JOIN is not a 
thing
+    throw new UnsupportedOperationException("ASOF JOIN does not support 
unmatched right rows");
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  private enum MatchConditionType {
+    GREATER_THAN,
+    GREATER_THAN_OR_EQUAL,
+    LESS_THAN,
+    LESS_THAN_OR_EQUAL
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 8fee3d0e9e..5040bae620 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -172,7 +172,49 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
     return mseBlock;
   }
 
-  protected abstract void buildRightTable();
+  protected void buildRightTable() {
+    LOGGER.trace("Building right table for join operator");
+    long startTime = System.currentTimeMillis();
+    int numRows = 0;
+    MseBlock rightBlock = _rightInput.nextBlock();
+    while (rightBlock.isData()) {
+      List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows();
+      // Row based overflow check.
+      if (rows.size() + numRows > _maxRowsInJoin) {
+        if (_joinOverflowMode == JoinOverFlowMode.THROW) {
+          throwForJoinRowLimitExceeded(
+              "Cannot build in memory hash table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
+        } else {
+          // Just fill up the buffer.
+          int remainingRows = _maxRowsInJoin - numRows;
+          rows = rows.subList(0, remainingRows);
+          _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
+          // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
+          _rightInput.earlyTerminate();
+        }
+      }
+
+      addRowsToRightTable(rows);
+      numRows += rows.size();
+      sampleAndCheckInterruption();
+      rightBlock = _rightInput.nextBlock();
+    }
+
+    MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
+    if (eosBlock.isError()) {
+      _eos = eosBlock;
+    } else {
+      _isRightTableBuilt = true;
+      finishBuildingRightTable();
+    }
+
+    _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, 
System.currentTimeMillis() - startTime);
+    LOGGER.trace("Finished building right table for join operator");
+  }
+
+  protected abstract void addRowsToRightTable(List<Object[]> rows);
+
+  protected abstract void finishBuildingRightTable();
 
   protected MseBlock buildJoinedDataBlock() {
     LOGGER.trace("Building joined data block for join operator");
@@ -240,7 +282,7 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
   }
 
   protected boolean needUnmatchedLeftRows() {
-    return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL;
+    return _joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL || 
_joinType == JoinRelType.LEFT_ASOF;
   }
 
   protected void earlyTerminateLeftInput() {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 5d4294546c..0cb1032367 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -36,7 +36,6 @@ import 
org.apache.pinot.query.runtime.operator.join.LongLookupTable;
 import org.apache.pinot.query.runtime.operator.join.LookupTable;
 import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
 
 
 /**
@@ -93,44 +92,15 @@ public class HashJoinOperator extends BaseJoinOperator {
   }
 
   @Override
-  protected void buildRightTable() {
-    LOGGER.trace("Building hash table for join operator");
-    long startTime = System.currentTimeMillis();
-    int numRows = 0;
-    MseBlock rightBlock = _rightInput.nextBlock();
-    while (rightBlock.isData()) {
-      MseBlock.Data dataBlock = (MseBlock.Data) rightBlock;
-      List<Object[]> rows = dataBlock.asRowHeap().getRows();
-      // Row based overflow check.
-      if (rows.size() + numRows > _maxRowsInJoin) {
-        if (_joinOverflowMode == JoinOverFlowMode.THROW) {
-          throwForJoinRowLimitExceeded(
-              "Cannot build in memory hash table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
-        } else {
-          // Just fill up the buffer.
-          int remainingRows = _maxRowsInJoin - numRows;
-          rows = rows.subList(0, remainingRows);
-          _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
-          // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
-          _rightInput.earlyTerminate();
-        }
-      }
-      for (Object[] row : rows) {
-        _rightTable.addRow(_rightKeySelector.getKey(row), row);
-      }
-      numRows += rows.size();
-      sampleAndCheckInterruption();
-      rightBlock = _rightInput.nextBlock();
+  protected void addRowsToRightTable(List<Object[]> rows) {
+    for (Object[] row : rows) {
+      _rightTable.addRow(_rightKeySelector.getKey(row), row);
     }
-    MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
-    if (eosBlock.isError()) {
-      _eos = eosBlock;
-    } else {
-      _rightTable.finish();
-      _isRightTableBuilt = true;
-    }
-    _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, 
System.currentTimeMillis() - startTime);
-    LOGGER.trace("Finished building hash table for join operator");
+  }
+
+  @Override
+  protected void finishBuildingRightTable() {
+    _rightTable.finish();
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 9ffcad83c1..34c3e99287 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
 
 
 /**
@@ -57,42 +56,15 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
   }
 
   @Override
-  protected void buildRightTable() {
-    LOGGER.trace("Building right table for join operator");
-    long startTime = System.currentTimeMillis();
-    MseBlock rightBlock = _rightInput.nextBlock();
-    while (rightBlock.isData()) {
-      List<Object[]> rows = ((MseBlock.Data) rightBlock).asRowHeap().getRows();
-      int numRowsInRightTable = _rightTable.size();
-      // Row based overflow check.
-      if (rows.size() + numRowsInRightTable > _maxRowsInJoin) {
-        if (_joinOverflowMode == JoinOverFlowMode.THROW) {
-          throwForJoinRowLimitExceeded(
-              "Cannot build in memory right table for join operator, reached 
number of rows limit: " + _maxRowsInJoin);
-        } else {
-          // Just fill up the buffer.
-          int remainingRows = _maxRowsInJoin - numRowsInRightTable;
-          rows = rows.subList(0, remainingRows);
-          _statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
-          // setting only the rightTableOperator to be early terminated and 
awaits EOS block next.
-          _rightInput.earlyTerminate();
-        }
-      }
-      _rightTable.addAll(rows);
-      sampleAndCheckInterruption();
-      rightBlock = _rightInput.nextBlock();
-    }
-    MseBlock.Eos eosBlock = (MseBlock.Eos) rightBlock;
-    if (eosBlock.isError()) {
-      _eos = eosBlock;
-    } else {
-      _isRightTableBuilt = true;
-      if (needUnmatchedRightRows()) {
-        _matchedRightRows = new BitSet(_rightTable.size());
-      }
+  protected void addRowsToRightTable(List<Object[]> rows) {
+    _rightTable.addAll(rows);
+  }
+
+  @Override
+  protected void finishBuildingRightTable() {
+    if (needUnmatchedRightRows()) {
+      _matchedRightRows = new BitSet(_rightTable.size());
     }
-    _statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, 
System.currentTimeMillis() - startTime);
-    LOGGER.trace("Finished building right table for join operator");
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index 5dd8be81d3..c08cb89359 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -37,6 +37,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
+import org.apache.pinot.query.runtime.operator.AsofJoinOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.IntersectAllOperator;
@@ -180,16 +181,20 @@ public class PlanNodeToOpChain {
       PlanNode right = inputs.get(1);
       MultiStageOperator rightOperator = visit(right, context);
       JoinNode.JoinStrategy joinStrategy = node.getJoinStrategy();
-      if (joinStrategy == JoinNode.JoinStrategy.HASH) {
-        if (node.getLeftKeys().isEmpty()) {
-          // TODO: Consider adding non-equi as a separate join strategy.
-          return new NonEquiJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
-        } else {
-          return new HashJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
-        }
-      } else {
-        assert joinStrategy == JoinNode.JoinStrategy.LOOKUP;
-        return new LookupJoinOperator(context, leftOperator, rightOperator, 
node);
+      switch (joinStrategy) {
+        case HASH:
+          if (node.getLeftKeys().isEmpty()) {
+            // TODO: Consider adding non-equi as a separate join strategy.
+            return new NonEquiJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+          } else {
+            return new HashJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+          }
+        case LOOKUP:
+          return new LookupJoinOperator(context, leftOperator, rightOperator, 
node);
+        case ASOF:
+          return new AsofJoinOperator(context, leftOperator, 
left.getDataSchema(), rightOperator, node);
+        default:
+          throw new IllegalStateException("Unsupported JoinStrategy: " + 
joinStrategy);
       }
     }
 
diff --git a/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json 
b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json
new file mode 100644
index 0000000000..a2d9686e44
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/AsOfJoin.json
@@ -0,0 +1,418 @@
+{
+  "as_of_join_queries": {
+    "tables": {
+      "t1": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", 1],
+          ["b", 2],
+          ["c", 3],
+          ["d", 4],
+          ["e", 5]
+        ]
+      },
+      "t2": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["b", 2],
+          ["a", 1],
+          ["c", 3],
+          ["a", 2],
+          ["c", 1],
+          ["b", 3],
+          ["d", 5]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["c", 3, "c", 1]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["b", 2, "b", 3],
+          ["d", 4, "d", 5]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3],
+          ["d", 4, "d", 5]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, null, null],
+          ["b", 2, null, null],
+          ["c", 3, "c", 1],
+          ["d", 4, null, null],
+          ["e", 5, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3],
+          ["d", 4, null, null],
+          ["e", 5, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["b", 2, "b", 3],
+          ["c", 3, null, null],
+          ["d", 4, "d", 5],
+          ["e", 5, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3],
+          ["d", 4, "d", 5],
+          ["e", 5, null, null]
+        ]
+      }
+    ]
+  },
+  "as_of_join_queries_without_hash_key_join": {
+    "tables": {
+      "t1": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", 1],
+          ["b", 2],
+          ["c", 3],
+          ["d", 4],
+          ["e", 5]
+        ]
+      },
+      "t2": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["b", 2],
+          ["a", 1],
+          ["c", 3],
+          ["a", 4],
+          ["c", 7],
+          ["b", 6],
+          ["d", 5]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON true",
+        "outputs": [
+          ["b", 2, "a", 1],
+          ["c", 3, "b", 2],
+          ["d", 4, "c", 3],
+          ["e", 5, "a", 4]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON true",
+        "outputs": [
+          ["a", 1, null, null],
+          ["b", 2, "a", 1],
+          ["c", 3, "b", 2],
+          ["d", 4, "c", 3],
+          ["e", 5, "a", 4]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= 
{t2}.asof_col) ON true",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3],
+          ["d", 4, "a", 4],
+          ["e", 5, "d", 5]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < 
{t2}.asof_col) ON true",
+        "outputs": [
+          ["a", 1, "b", 2],
+          ["b", 2, "c", 3],
+          ["c", 3, "a", 4],
+          ["d", 4, "d", 5],
+          ["e", 5, "b", 6]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= 
{t2}.asof_col) ON true",
+        "outputs": [
+          ["a", 1, "a", 1],
+          ["b", 2, "b", 2],
+          ["c", 3, "c", 3],
+          ["d", 4, "a", 4],
+          ["e", 5, "d", 5]
+        ]
+      }
+    ]
+  },
+  "as_of_join_queries_with_nulls": {
+    "tables": {
+      "t1": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", 1],
+          ["a", 5],
+          ["b", 3],
+          ["c", null],
+          ["d", 4],
+          ["e", 7],
+          ["f", 10],
+          ["g", 12]
+        ]
+      },
+      "t2": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", 0],
+          ["a", 2],
+          ["a", null],
+          ["b", 2],
+          ["b", null],
+          ["c", 5],
+          ["d", 4],
+          ["d", 6],
+          ["f", null],
+          ["f", 11],
+          ["g", null],
+          ["h", 9]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 0],
+          ["a", 5, "a", 2],
+          ["b", 3, "b", 2]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 0],
+          ["a", 5, "a", 2],
+          ["b", 3, "b", 2],
+          ["d", 4, "d", 4]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["d", 4, "d", 6],
+          ["f", 10, "f", 11]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["d", 4, "d", 4],
+          ["f", 10, "f", 11]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 0],
+          ["a", 5, "a", 2],
+          ["b", 3, "b", 2],
+          ["c", null, null, null],
+          ["d", 4, null, null],
+          ["e", 7, null, null],
+          ["f", 10, null, null],
+          ["g", 12, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col >= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 0],
+          ["a", 5, "a", 2],
+          ["b", 3, "b", 2],
+          ["c", null, null, null],
+          ["d", 4, "d", 4],
+          ["e", 7, null, null],
+          ["f", 10, null, null],
+          ["g", 12, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col < 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["a", 5, null, null],
+          ["b", 3, null, null],
+          ["c", null, null, null],
+          ["d", 4, "d", 6],
+          ["e", 7, null, null],
+          ["f", 10, "f", 11],
+          ["g", 12, null, null]
+        ]
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col <= 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "outputs": [
+          ["a", 1, "a", 2],
+          ["a", 5, null, null],
+          ["b", 3, null, null],
+          ["c", null, null, null],
+          ["d", 4, "d", 4],
+          ["e", 7, null, null],
+          ["f", 10, "f", 11],
+          ["g", 12, null, null]
+        ]
+      }
+    ]
+  },
+  "as_of_join_unsupported_scenarios": {
+    "tables": {
+      "t1": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["a", 1],
+          ["b", 2],
+          ["c", 3],
+          ["d", 4],
+          ["e", 5]
+        ]
+      },
+      "t2": {
+        "schema": [
+          {"name": "key_col", "type": "STRING"},
+          {"name": "asof_col", "type": "INT"}
+        ],
+        "inputs": [
+          ["b", 2],
+          ["a", 1],
+          ["c", 3],
+          ["a", 2],
+          ["c", 1],
+          ["b", 3],
+          ["d", 5]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col)",
+        "expectedException": ".*exception while parsing query.*",
+        "comment": "Calcite currently doesn't support ASOF JOINs without an ON 
clause. This isn't just a parser limitation, since the assumption is also built 
into the validator."
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col)",
+        "expectedException": ".*exception while parsing query.*",
+        "comment": "Calcite currently doesn't support ASOF JOINs without an ON 
clause. This isn't just a parser limitation, since the assumption is also built 
into the validator."
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col AND {t1}.asof_col > 0",
+        "expectedException": ".*ASOF JOIN condition must be a conjunction of 
equality comparisons.*"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col != {t2}.key_col",
+        "expectedException": ".*ASOF JOIN condition must be a conjunction of 
equality comparisons.*"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col OR {t1}.asof_col = {t2}.asof_col",
+        "expectedException": ".*ASOF JOIN condition must be a conjunction of 
equality comparisons.*"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*exception while parsing query.*",
+        "comment": "MATCH_CONDITION is required for ASOF JOINs"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} LEFT ASOF JOIN {t2} ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*exception while parsing query.*",
+        "comment": "MATCH_CONDITION is required for ASOF JOINs"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col != 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col = 
{t2}.asof_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col OR {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.asof_col AND {t1}.key_col > {t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 0) ON 
{t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t1}.key_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF JOIN MATCH_CONDITION must be a comparison 
between columns from the two inputs.*",
+        "comment": "MATCH_CONDITION only supports a single predicate comparing 
two columns that is one out of: (>, >=, <, <=)"
+      },
+      {
+        "sql": "SELECT {t1}.key_col, {t1}.asof_col, {t2}.key_col, 
{t2}.asof_col FROM {t1} ASOF JOIN {t2} MATCH_CONDITION({t1}.asof_col > 
{t2}.key_col) ON {t1}.key_col = {t2}.key_col",
+        "expectedException": ".*ASOF_JOIN only supports match conditions with 
a comparison between two columns of the same type.*",
+        "comment": "We currently don't support MATCH_CONDITION comparing 
columns of different types"
+      }
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to