This is an automated email from the ASF dual-hosted git repository.

mbudiu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit b390359a955cab9818a135543c056f7827419f7f
Author: Mihai Budiu <[email protected]>
AuthorDate: Wed Aug 14 11:47:30 2024 -0700

    [CALCITE-6372] Add ASOF join to the Calcite sql2rel converter
    
    Signed-off-by: Mihai Budiu <[email protected]>
---
 .../java/org/apache/calcite/plan/RelOptUtil.java   |  14 +-
 .../java/org/apache/calcite/rel/core/AsofJoin.java |  63 ++++++++
 .../java/org/apache/calcite/rel/core/Join.java     |   2 +-
 .../org/apache/calcite/rel/core/JoinRelType.java   |  27 +++-
 .../org/apache/calcite/rel/core/RelFactories.java  |  39 +++++
 .../calcite/rel/logical/LogicalAsofJoin.java       | 160 +++++++++++++++++++++
 .../apache/calcite/rel/logical/LogicalJoin.java    |   4 -
 .../calcite/rel/metadata/RelMdCollation.java       |   2 +
 .../calcite/rel/metadata/RelMdPredicates.java      |   2 +
 .../org/apache/calcite/rel/metadata/RelMdUtil.java |   4 +
 .../apache/calcite/rel/rel2sql/SqlImplementor.java |   4 +
 .../rel/rules/AbstractJoinExtractFilterRule.java   |   3 +-
 .../calcite/rel/rules/JoinExtractFilterRule.java   |   3 +-
 .../apache/calcite/sql2rel/RelFieldTrimmer.java    |  16 +++
 .../sql2rel/RelStructuredTypeFlattener.java        |  12 ++
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  77 ++++++++--
 .../java/org/apache/calcite/tools/RelBuilder.java  |  35 +++++
 .../apache/calcite/test/SqlToRelConverterTest.java |   7 +
 .../apache/calcite/test/SqlToRelConverterTest.xml  |  16 +++
 19 files changed, 465 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java 
b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index ffd1252e56..5042bc04ca 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -47,6 +47,7 @@ import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -3848,9 +3849,16 @@ public abstract class RelOptUtil {
 
     final RelNode right = relBuilder.build();
     final RelNode left = relBuilder.build();
-    relBuilder.push(
-        originalJoin.copy(originalJoin.getTraitSet(),
-            joinCond, left, right, joinType, originalJoin.isSemiJoinDone()));
+    if (joinType == JoinRelType.ASOF || joinType == JoinRelType.LEFT_ASOF) {
+      LogicalAsofJoin ljoin = (LogicalAsofJoin) originalJoin;
+      RelNode copy =
+          ljoin.copy(originalJoin.getTraitSet(), joinCond, 
ljoin.getMatchCondition(), left, right);
+      relBuilder.push(copy);
+    } else {
+      relBuilder.push(
+          originalJoin.copy(originalJoin.getTraitSet(),
+              joinCond, left, right, joinType, originalJoin.isSemiJoinDone()));
+    }
     if (!extraLeftExprs.isEmpty() || !extraRightExprs.isEmpty()) {
       final int totalFields = joinType.projectsRight()
           ? leftCount + extraLeftExprs.size() + rightCount + 
extraRightExprs.size()
diff --git a/core/src/main/java/org/apache/calcite/rel/core/AsofJoin.java 
b/core/src/main/java/org/apache/calcite/rel/core/AsofJoin.java
new file mode 100644
index 0000000000..f5682b66c5
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/core/AsofJoin.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for various ASOF JOIN representations.
+ */
+public abstract class AsofJoin extends Join {
+  /** Compared to standard joins, ASOF joins have an additional condition for 
comparing
+   * columns that usually contain timestamp values (however, the data type of 
these columns
+   * can be any type that supports comparisons, and is not restricted to be 
TIMESTAMP).
+   */
+  protected final RexNode matchCondition;
+
+  protected AsofJoin(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      List<RelHint> hints,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      RexNode matchCondition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) {
+    super(cluster, traitSet, hints, left, right, condition, variablesSet, 
joinType);
+    this.matchCondition = requireNonNull(matchCondition, "matchCondition");
+  }
+
+  public RexNode getMatchCondition() {
+    return matchCondition;
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .item("matchCondition", matchCondition);
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Join.java 
b/core/src/main/java/org/apache/calcite/rel/core/Join.java
index dc99bfbfe0..23d2f923a8 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Join.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Join.java
@@ -309,7 +309,7 @@ public abstract class Join extends BiRel implements 
Hintable {
         fieldNameList, systemFieldList);
   }
 
-  @Override public final Join copy(RelTraitSet traitSet, List<RelNode> inputs) 
{
+  @Override public Join copy(RelTraitSet traitSet, List<RelNode> inputs) {
     assert inputs.size() == 2;
     return copy(traitSet, getCondition(), inputs.get(0), inputs.get(1),
         joinType, isSemiJoinDone());
diff --git a/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java 
b/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
index eef436c9bf..dbb2891ce2 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/JoinRelType.java
@@ -70,7 +70,28 @@ public enum JoinRelType {
    *     WHERE DEPT.DEPTNO = EMP.DEPTNO)</pre>
    * </blockquote>
    */
-  ANTI;
+  ANTI,
+
+  /**
+   * An ASOF JOIN operation combines rows from two tables based on comparable 
timestamp values.
+   * For each row in the left table, the join finds at most one row in the 
right table that has the
+   * "closest" timestamp value. The matched row on the right side is the 
closest match,
+   * which could less than or equal or greater than or equal in the timestamp 
column,
+   * as specified by the comparison operator.
+   *
+   * <p>Example:
+   * <blockquote><pre>
+   * FROM left_table ASOF JOIN right_table
+   *   MATCH_CONDITION ( left_table.timecol &le; right_table.timecol )
+   *   ON left_table.col = right_table.col</pre>
+   * </blockquote>
+   */
+  ASOF,
+
+  /**
+   * The left version of an ASOF join, where each row from the left table is 
part of the output.
+   */
+  LEFT_ASOF;
 
   /** Lower-case name. */
   public final String lowerName = name().toLowerCase(Locale.ROOT);
@@ -80,7 +101,7 @@ public enum JoinRelType {
    * right-hand side.
    */
   public boolean generatesNullsOnRight() {
-    return (this == LEFT) || (this == FULL);
+    return (this == LEFT) || (this == FULL) || (this == LEFT_ASOF);
   }
 
   /**
@@ -96,7 +117,7 @@ public enum JoinRelType {
    * generate NULL values, either on the left-hand side or right-hand side.
    */
   public boolean isOuterJoin() {
-    return (this == LEFT) || (this == RIGHT) || (this == FULL);
+    return (this == LEFT) || (this == RIGHT) || (this == FULL) || (this == 
LEFT_ASOF);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java 
b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
index a00171ef76..a8e53d9b74 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -85,6 +86,8 @@ public class RelFactories {
 
   public static final JoinFactory DEFAULT_JOIN_FACTORY = new JoinFactoryImpl();
 
+  public static final AsofJoinFactory DEFAULT_ASOFJOIN_FACTORY = new 
AsofJoinFactoryImpl();
+
   public static final CorrelateFactory DEFAULT_CORRELATE_FACTORY =
       new CorrelateFactoryImpl();
 
@@ -136,6 +139,7 @@ public class RelFactories {
           DEFAULT_SORT_EXCHANGE_FACTORY,
           DEFAULT_SET_OP_FACTORY,
           DEFAULT_JOIN_FACTORY,
+          DEFAULT_ASOFJOIN_FACTORY,
           DEFAULT_CORRELATE_FACTORY,
           DEFAULT_VALUES_FACTORY,
           DEFAULT_TABLE_SCAN_FACTORY,
@@ -398,6 +402,24 @@ public class RelFactories {
         boolean semiJoinDone);
   }
 
+  /**
+   * Creates ASOF join of the appropriate type for a rule's calling convention.
+   */
+  public interface AsofJoinFactory {
+    /**
+     * Creates an ASOF join.
+     *
+     * @param left             Left input
+     * @param right            Right input
+     * @param hints            Hints
+     * @param condition        Join condition
+     * @param matchCondition   ASOF join match condition
+     * @param joinType         Type of join (ASOF or LEFT_ASOF)
+     */
+    RelNode createAsofJoin(RelNode left, RelNode right, List<RelHint> hints,
+        RexNode condition, RexNode matchCondition, JoinRelType joinType);
+  }
+
   /**
    * Implementation of {@link JoinFactory} that returns a vanilla
    * {@link org.apache.calcite.rel.logical.LogicalJoin}.
@@ -411,6 +433,18 @@ public class RelFactories {
     }
   }
 
+  /**
+   * Implementation of {@link AsofJoinFactory} that returns a vanilla
+   * {@link LogicalAsofJoin}.
+   */
+  private static class AsofJoinFactoryImpl implements AsofJoinFactory {
+    @Override public RelNode createAsofJoin(RelNode left, RelNode right, 
List<RelHint> hints,
+        RexNode condition, RexNode matchCondition, JoinRelType joinType) {
+      return LogicalAsofJoin.create(left, right, hints,
+          condition, matchCondition, joinType, ImmutableList.of());
+    }
+  }
+
   /**
    * Can create a correlate of the appropriate type for a rule's calling
    * convention.
@@ -680,6 +714,7 @@ public class RelFactories {
     public final SortExchangeFactory sortExchangeFactory;
     public final SetOpFactory setOpFactory;
     public final JoinFactory joinFactory;
+    public final AsofJoinFactory asofJoinFactory;
     public final CorrelateFactory correlateFactory;
     public final ValuesFactory valuesFactory;
     public final TableScanFactory scanFactory;
@@ -698,6 +733,7 @@ public class RelFactories {
         SortExchangeFactory sortExchangeFactory,
         SetOpFactory setOpFactory,
         JoinFactory joinFactory,
+        AsofJoinFactory asofJoinFactory,
         CorrelateFactory correlateFactory,
         ValuesFactory valuesFactory,
         TableScanFactory scanFactory,
@@ -715,6 +751,7 @@ public class RelFactories {
       this.sortExchangeFactory = requireNonNull(sortExchangeFactory, 
"sortExchangeFactory");
       this.setOpFactory = requireNonNull(setOpFactory, "setOpFactory");
       this.joinFactory = requireNonNull(joinFactory, "joinFactory");
+      this.asofJoinFactory = requireNonNull(asofJoinFactory, 
"asofJoinFactory");
       this.correlateFactory = requireNonNull(correlateFactory, 
"correlateFactory");
       this.valuesFactory = requireNonNull(valuesFactory, "valuesFactory");
       this.scanFactory = requireNonNull(scanFactory, "scanFactory");
@@ -749,6 +786,8 @@ public class RelFactories {
               .orElse(DEFAULT_SET_OP_FACTORY),
           context.maybeUnwrap(JoinFactory.class)
               .orElse(DEFAULT_JOIN_FACTORY),
+          context.maybeUnwrap(AsofJoinFactory.class)
+              .orElse(DEFAULT_ASOFJOIN_FACTORY),
           context.maybeUnwrap(CorrelateFactory.class)
               .orElse(DEFAULT_CORRELATE_FACTORY),
           context.maybeUnwrap(ValuesFactory.class)
diff --git 
a/core/src/main/java/org/apache/calcite/rel/logical/LogicalAsofJoin.java 
b/core/src/main/java/org/apache/calcite/rel/logical/LogicalAsofJoin.java
new file mode 100644
index 0000000000..4f68408694
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalAsofJoin.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.AsofJoin;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Sub-class of {@link AsofJoin} encoding ASOF joins.
+ * Adapted from the {@link LogicalJoin} implementation.
+ */
+public final class LogicalAsofJoin extends AsofJoin {
+  //~ Instance fields --------------------------------------------------------
+
+  private final ImmutableList<RelDataTypeField> systemFieldList;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a LogicalAsofJoin.
+   *
+   * <p>Use {@link #create} unless you know what you're doing.
+   *
+   * @param cluster          Cluster
+   * @param traitSet         Trait set
+   * @param hints            Hints
+   * @param left             Left input
+   * @param right            Right input
+   * @param condition        Join condition
+   * @param matchCondition   Temporal condition
+   * @param systemFieldList  List of system fields that will be prefixed to
+   *                         output row type; typically empty but must not be 
null
+   */
+  public LogicalAsofJoin(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      List<RelHint> hints,
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      RexNode matchCondition,
+      JoinRelType joinType,
+      ImmutableList<RelDataTypeField> systemFieldList) {
+    super(cluster, traitSet, hints, left, right,
+        condition, matchCondition, ImmutableSet.of(), joinType);
+    this.systemFieldList = requireNonNull(systemFieldList, "systemFieldList");
+  }
+
+  /**
+   * Creates a LogicalAsofJoin by parsing serialized output.
+   */
+  public LogicalAsofJoin(RelInput input) {
+    this(input.getCluster(), input.getCluster().traitSetOf(Convention.NONE),
+        new ArrayList<>(),
+        input.getInputs().get(0), input.getInputs().get(1),
+        requireNonNull(input.getExpression("condition"), "condition"),
+        requireNonNull(input.getExpression("matchCondition"), 
"matchCondition"),
+        requireNonNull(input.getEnum("joinType", JoinRelType.class), 
"joinType"),
+        ImmutableList.of());
+  }
+
+  /** Creates a LogicalAsofJoin. */
+  public static LogicalAsofJoin create(RelNode left, RelNode right, 
List<RelHint> hints,
+      RexNode condition, RexNode matchCondition,
+      JoinRelType joinType,
+      ImmutableList<RelDataTypeField> systemFieldList) {
+    final RelOptCluster cluster = left.getCluster();
+    final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
+    return new LogicalAsofJoin(cluster, traitSet, hints, left, right, 
condition, matchCondition,
+        joinType, systemFieldList);
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  public LogicalAsofJoin copy(
+      RelTraitSet traitSet, RexNode conditionExpr, RexNode matchConditionExpr,
+      RelNode left, RelNode right) {
+    assert traitSet.containsIfApplicable(Convention.NONE);
+    return new LogicalAsofJoin(getCluster(),
+        getCluster().traitSetOf(Convention.NONE), hints, left, right, 
conditionExpr,
+        matchConditionExpr, joinType, systemFieldList);
+  }
+
+  @Override public RelNode accept(RelShuttle shuttle) {
+    return shuttle.visit(this);
+  }
+
+  @Override public boolean deepEquals(@Nullable Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    LogicalAsofJoin asofObj = (LogicalAsofJoin) obj;
+    assert asofObj != null;
+    return deepEquals0(obj)
+        && matchCondition.equals(asofObj.matchCondition)
+        && systemFieldList.equals(asofObj.systemFieldList);
+  }
+
+  @Override public int deepHashCode() {
+    return Objects.hash(deepHashCode0(), systemFieldList);
+  }
+
+  @Override public ImmutableList<RelDataTypeField> getSystemFieldList() {
+    return systemFieldList;
+  }
+
+  @Override public Join copy(
+      RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right,
+      JoinRelType joinType, boolean semiJoinDone) {
+    // This method does not provide the matchCondition as an argument, so it 
should never be called
+    throw new RuntimeException("This method should not be called");
+  }
+
+  @Override public Join copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    assert inputs.size() == 2;
+    return new LogicalAsofJoin(getCluster(), traitSet, hints,
+        inputs.get(0), inputs.get(1),
+        getCondition(), getMatchCondition(), joinType, systemFieldList);
+  }
+
+  @Override public RelNode withHints(List<RelHint> hintList) {
+    return new LogicalAsofJoin(getCluster(), traitSet, hintList,
+        left, right, condition, matchCondition, joinType, systemFieldList);
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java 
b/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
index c369fa8e04..a0e8b0ffa5 100644
--- a/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
@@ -52,10 +52,6 @@ import static java.util.Objects.requireNonNull;
  * <li>{@link org.apache.calcite.rel.rules.JoinExtractFilterRule} converts an
  * {@link LogicalJoin inner join} to a {@link LogicalFilter filter} on top of a
  * {@link LogicalJoin cartesian inner join}.
- *
- * <li>{@code net.sf.farrago.fennel.rel.FennelCartesianJoinRule}
- * implements a LogicalJoin as a cartesian product.
- *
  * </ul>
  */
 public final class LogicalJoin extends Join {
diff --git 
a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
index 9c728384f4..ffe02f9e82 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
@@ -572,6 +572,8 @@ public class RelMdCollation
     case ANTI:
     case INNER:
     case LEFT:
+    case ASOF:
+    case LEFT_ASOF:
       return leftCollations;
     case RIGHT:
     case FULL:
diff --git 
a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdPredicates.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdPredicates.java
index 7473293d7a..7cc9aadbd3 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdPredicates.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdPredicates.java
@@ -753,6 +753,8 @@ public class RelMdPredicates
       case INNER:
       case LEFT:
       case ANTI:
+      case ASOF:
+      case LEFT_ASOF:
         infer(leftChildPredicates, allExprs, inferredPredicates,
             includeEqualityInference,
             joinType == JoinRelType.LEFT ? rightFieldsBitSet
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java 
b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
index 11121db1cf..1b1fad4571 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUtil.java
@@ -831,6 +831,10 @@ public class RelMdUtil {
     }
     double innerRowCount = left * right * selectivity;
     switch (join.getJoinType()) {
+    case ASOF:
+      return left * selectivity;
+    case LEFT_ASOF:
+      return left;
     case INNER:
       return innerRowCount;
     case LEFT:
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java 
b/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
index 63335d12f3..b50e4e259d 100644
--- a/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
+++ b/core/src/main/java/org/apache/calcite/rel/rel2sql/SqlImplementor.java
@@ -386,6 +386,10 @@ public abstract class SqlImplementor {
       return JoinType.INNER;
     case FULL:
       return JoinType.FULL;
+    case ASOF:
+      return JoinType.ASOF;
+    case LEFT_ASOF:
+      return JoinType.LEFT_ASOF;
     default:
       throw new AssertionError(joinType);
     }
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java
 
b/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java
index ad3c3a6a8a..fdb28df5bc 100644
--- 
a/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java
+++ 
b/core/src/main/java/org/apache/calcite/rel/rules/AbstractJoinExtractFilterRule.java
@@ -30,8 +30,7 @@ import org.apache.calcite.tools.RelBuilder;
  * {@link org.apache.calcite.rel.core.Join cartesian inner join}.
  *
  * <p>One benefit of this transformation is that after it, the join condition
- * can be combined with conditions and expressions above the join. It also 
makes
- * the <code>FennelCartesianJoinRule</code> applicable.
+ * can be combined with conditions and expressions above the join.
  *
  * <p>The constructor is parameterized to allow any sub-class of
  * {@link org.apache.calcite.rel.core.Join}.
diff --git 
a/core/src/main/java/org/apache/calcite/rel/rules/JoinExtractFilterRule.java 
b/core/src/main/java/org/apache/calcite/rel/rules/JoinExtractFilterRule.java
index 2251c0d321..5e71dc7ef8 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/JoinExtractFilterRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/JoinExtractFilterRule.java
@@ -29,8 +29,7 @@ import org.immutables.value.Value;
  * {@link org.apache.calcite.rel.logical.LogicalJoin cartesian inner join}.
  *
  * <p>One benefit of this transformation is that after it, the join condition
- * can be combined with conditions and expressions above the join. It also 
makes
- * the <code>FennelCartesianJoinRule</code> applicable.
+ * can be combined with conditions and expressions above the join.
  *
  * <p>Can be configured to match any sub-class of
  * {@link org.apache.calcite.rel.core.Join}, not just
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java 
b/core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java
index a8d99126ea..0669747c08 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelFieldTrimmer.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.AsofJoin;
 import org.apache.calcite.rel.core.Calc;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Exchange;
@@ -84,6 +85,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -786,6 +788,9 @@ public class RelFieldTrimmer implements ReflectiveVisitor {
         + join.getLeft().getRowType().getFieldCount()
         + join.getRight().getRowType().getFieldCount();
     final RexNode conditionExpr = join.getCondition();
+    final RexNode matchConditionExpr = (join instanceof AsofJoin)
+        ? ((AsofJoin) join).getMatchCondition()
+        : null;
     final int systemFieldCount = join.getSystemFieldList().size();
 
     // Add in fields used in the condition.
@@ -794,6 +799,9 @@ public class RelFieldTrimmer implements ReflectiveVisitor {
     RelOptUtil.InputFinder inputFinder =
         new RelOptUtil.InputFinder(combinedInputExtraFields, fieldsUsed);
     conditionExpr.accept(inputFinder);
+    if (matchConditionExpr != null) {
+      matchConditionExpr.accept(inputFinder);
+    }
     final ImmutableBitSet fieldsUsedPlus = inputFinder.build();
 
     // If no system fields are used, we can remove them.
@@ -887,6 +895,8 @@ public class RelFieldTrimmer implements ReflectiveVisitor {
             mapping, newInputs.get(0), newInputs.get(1));
     RexNode newConditionExpr =
         conditionExpr.accept(shuttle);
+    RexNode newMatchConditionExpr =
+        matchConditionExpr != null ? matchConditionExpr.accept(shuttle) : null;
 
     relBuilder.push(newInputs.get(0));
     relBuilder.push(newInputs.get(1));
@@ -914,8 +924,14 @@ public class RelFieldTrimmer implements ReflectiveVisitor {
         mapping.set(pair.source + offset, pair.target + newOffset);
       }
       break;
+    case ASOF:
+    case LEFT_ASOF:
+      relBuilder.asofJoin(join.getJoinType(), newConditionExpr,
+          Objects.requireNonNull(newMatchConditionExpr, 
"newMatchConditionExpr"));
+      break;
     default:
       relBuilder.join(join.getJoinType(), newConditionExpr);
+      break;
     }
     return result(relBuilder.build(), mapping, join);
   }
diff --git 
a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java 
b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
index f9a0d14504..120d63686a 100644
--- 
a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
+++ 
b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
@@ -33,6 +33,7 @@ import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalExchange;
@@ -472,6 +473,17 @@ public class RelStructuredTypeFlattener implements 
ReflectiveVisitor {
     setNewForOldRel(rel, newRel);
   }
 
+  public void rewriteRel(LogicalAsofJoin rel) {
+    final LogicalAsofJoin newRel =
+        LogicalAsofJoin.create(getNewForOldRel(rel.getLeft()),
+            getNewForOldRel(rel.getRight()),
+            rel.getHints(),
+            rel.getCondition().accept(new RewriteRexShuttle()),
+            rel.getMatchCondition().accept(new RewriteRexShuttle()),
+            rel.getJoinType(), rel.getSystemFieldList());
+    setNewForOldRel(rel, newRel);
+  }
+
   public void rewriteRel(LogicalJoin rel) {
     final LogicalJoin newRel =
         LogicalJoin.create(getNewForOldRel(rel.getLeft()),
diff --git 
a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java 
b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 5bb6b50e0a..44f264758a 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -106,6 +106,7 @@ import org.apache.calcite.schema.Wrapper;
 import org.apache.calcite.sql.JoinConditionType;
 import org.apache.calcite.sql.JoinType;
 import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlAsofJoin;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCallBinding;
@@ -3067,6 +3068,43 @@ public class SqlToRelConverter {
     return node;
   }
 
+  protected RelNode createAsofJoin(
+      SqlParserPos pos,
+      Blackboard bb,
+      RelNode leftRel,
+      RelNode rightRel,
+      @Nullable RexNode joinCond,
+      RexNode matchCondition,
+      JoinRelType joinType) {
+    assert joinCond != null;
+    assert matchCondition != null;
+
+    final CorrelationUse p = getCorrelationUse(bb, rightRel);
+    if (p != null) {
+      // Ideally this should be checked by the validator, but the correlation 
information
+      // is not know at that point.
+      throw SqlUtil.newContextException(pos, 
RESOURCE.asofCannotBeCorrelated());
+    }
+
+    final RelNode node =
+        relBuilder.push(leftRel)
+            .push(rightRel)
+            .asofJoin(joinType, joinCond, matchCondition)
+            .build();
+
+    // If join conditions are pushed down, update the leaves.
+    if (node instanceof Project) {
+      final Join newJoin = (Join) node.getInputs().get(0);
+      if (leaves.containsKey(leftRel)) {
+        leaves.put(newJoin.getLeft(), leaves.get(leftRel));
+      }
+      if (leaves.containsKey(rightRel)) {
+        leaves.put(newJoin.getRight(), leaves.get(rightRel));
+      }
+    }
+    return node;
+  }
+
   private @Nullable CorrelationUse getCorrelationUse(Blackboard bb, final 
RelNode r0) {
     final Set<CorrelationId> correlatedVariables =
         RelOptUtil.getVariablesUsed(r0);
@@ -3244,6 +3282,7 @@ public class SqlToRelConverter {
 
     SqlNode left = join.getLeft();
     SqlNode right = join.getRight();
+    JoinType joinType = join.getJoinType();
     final SqlValidatorScope leftScope = validator.getJoinScope(left);
     final Blackboard leftBlackboard =
         createBlackboard(leftScope, null, false);
@@ -3257,7 +3296,7 @@ public class SqlToRelConverter {
 
     final JoinConditionType conditionType = join.getConditionType();
     final RexNode condition;
-    final RelNode rightRel;
+    RelNode rightRel;
     if (join.isNatural()) {
       condition =
           convertNaturalCondition(getNamespace(left), getNamespace(right));
@@ -3275,8 +3314,11 @@ public class SqlToRelConverter {
         rightRel = tempRightRel;
         break;
       case ON:
+        SqlNode sqlCondition =
+            requireNonNull(join.getCondition(),
+                () -> "getCondition for join " + join);
         Pair<RexNode, RelNode> conditionAndRightNode =
-            convertOnCondition(fromBlackboard, join, leftRel, tempRightRel);
+            convertOnCondition(fromBlackboard, sqlCondition, leftRel, 
tempRightRel);
         condition = conditionAndRightNode.left;
         rightRel = conditionAndRightNode.right;
         break;
@@ -3284,9 +3326,24 @@ public class SqlToRelConverter {
         throw Util.unexpected(conditionType);
       }
     }
-    final RelNode joinRel =
-        createJoin(fromBlackboard, leftRel, rightRel, condition,
-            convertJoinType(join.getJoinType()));
+    final RelNode joinRel;
+    if (joinType == JoinType.ASOF || joinType == JoinType.LEFT_ASOF) {
+      SqlNode sqlMatchCondition =
+          requireNonNull(((SqlAsofJoin) join).getMatchCondition(),
+              () -> "getCondition for join " + join);
+      Pair<RexNode, RelNode> conditionAndRightNode =
+          convertOnCondition(fromBlackboard, sqlMatchCondition, leftRel, 
tempRightRel);
+      RexNode matchCondition = conditionAndRightNode.left;
+      rightRel = Objects.requireNonNull(conditionAndRightNode.right);
+      joinRel =
+          createAsofJoin(join.getParserPosition(), fromBlackboard,
+              leftRel, rightRel, condition, matchCondition,
+              convertJoinType(joinType));
+    } else {
+      joinRel =
+          createJoin(fromBlackboard, leftRel, rightRel, condition,
+              convertJoinType(joinType));
+    }
     relBuilder.push(joinRel);
     relBuilder.project(relBuilder.fields());
     bb.setRoot(relBuilder.build(), false);
@@ -3341,13 +3398,9 @@ public class SqlToRelConverter {
    */
   private Pair<RexNode, RelNode> convertOnCondition(
       Blackboard bb,
-      SqlJoin join,
+      SqlNode condition,
       RelNode leftRel,
       RelNode rightRel) {
-    SqlNode condition =
-        requireNonNull(join.getCondition(),
-            () -> "getCondition for join " + join);
-
     bb.setRoot(ImmutableList.of(leftRel, rightRel));
     replaceSubQueries(bb, condition, RelOptUtil.Logic.UNKNOWN_AS_FALSE);
     final RelNode newRightRel =
@@ -3415,6 +3468,10 @@ public class SqlToRelConverter {
       return JoinRelType.LEFT;
     case RIGHT:
       return JoinRelType.RIGHT;
+    case ASOF:
+      return JoinRelType.ASOF;
+    case LEFT_ASOF:
+      return JoinRelType.LEFT_ASOF;
     default:
       throw Util.unexpected(joinType);
     }
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java 
b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 908915e48b..69f7c43d58 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -59,6 +59,7 @@ import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.core.Values;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAsofJoin;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.metadata.RelColumnMapping;
@@ -2990,6 +2991,38 @@ public class RelBuilder {
     return push(repeatUnion);
   }
 
+  /** Creates a {@link LogicalAsofJoin} with the specified conditions. */
+  public RelBuilder asofJoin(JoinRelType joinType, RexNode condition, RexNode 
matchCondition) {
+    // Implementation based on the 'join' method
+    assert joinType == JoinRelType.ASOF || joinType == JoinRelType.LEFT_ASOF;
+    final Frame right = stack.pop();
+    final Frame left = stack.pop();
+    if (config.simplify()) {
+      // Normalize expanded versions IS NOT DISTINCT FROM so that simplifier 
does not
+      // transform the expression to something unrecognizable
+      if (condition instanceof RexCall) {
+        condition =
+            RelOptUtil.collapseExpandedIsNotDistinctFromExpr((RexCall) 
condition,
+                getRexBuilder());
+      }
+      condition = simplifier.simplifyUnknownAsFalse(condition);
+    }
+    final RelNode join;
+    RelNode join0 =
+        struct.asofJoinFactory.createAsofJoin(left.rel, right.rel,
+            ImmutableList.of(), condition, matchCondition, joinType);
+    if (join0 instanceof Join && config.pushJoinCondition()) {
+      join = RelOptUtil.pushDownJoinConditions((Join) join0, this);
+    } else {
+      join = join0;
+    }
+    final PairList<ImmutableSet<String>, RelDataTypeField> fields =
+        PairList.of();
+    fields.addAll(left.fields);
+    fields.addAll(right.fields);
+    stack.push(new Frame(join, fields));
+    return this;
+  }
 
   /** Creates a {@link Join} with an array of conditions. */
   public RelBuilder join(JoinRelType joinType, RexNode condition0,
@@ -4158,6 +4191,8 @@ public class RelBuilder {
           + " must not be used by left input to correlation");
     }
     switch (joinType) {
+    case LEFT_ASOF:
+    case ASOF:
     case RIGHT:
     case FULL:
       throw new IllegalArgumentException("Correlated " + joinType + " join is 
not supported");
diff --git 
a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java 
b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 94c76d9178..aba9bb1da2 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -278,6 +278,13 @@ class SqlToRelConverterTest extends SqlToRelTestBase {
     sql(sql).ok();
   }
 
+  @Test void testAsOfJoin() {
+    final String sql = "select emp.empno from emp asof join dept\n"
+        + "match_condition emp.deptno <= dept.deptno\n"
+        + "on ename = name";
+    sql(sql).ok();
+  }
+
   @Test void testJoinOnInSubQuery() {
     final String sql = "select * from emp left join dept\n"
         + "on emp.empno = 1\n"
diff --git 
a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml 
b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index dd0c66731c..4b08621046 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -521,6 +521,22 @@ LogicalProject(EXPR$0=[ROW(ITEM(ITEM(ITEM(ITEM($3, 0), 
'detail'), 'skills'), 0).
       <![CDATA[
 LogicalProject(EXPR$0=[ITEM(ITEM($3, 1).DETAIL.SKILLS, +(2, 3)).DESC])
   LogicalTableScan(table=[[CATALOG, SALES, DEPT_NESTED]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAsOfJoin">
+    <Resource name="sql">
+      <![CDATA[select emp.empno from emp asof join dept
+match_condition emp.deptno <= dept.deptno
+on ename = name]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(EMPNO=[$0])
+  LogicalAsofJoin(condition=[=($1, $11)], joinType=[asof], 
matchCondition=[<=($7, $9)])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+    LogicalProject(DEPTNO=[$0], NAME=[$1], NAME0=[CAST($1):VARCHAR(20) NOT 
NULL])
+      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
 ]]>
     </Resource>
   </TestCase>


Reply via email to