This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b46d1bf0b64 [FLINK-36988][table] Migrate 
`LogicalCorrelateToJoinFromTemporalTableFunctionRule` to java
b46d1bf0b64 is described below

commit b46d1bf0b649492336ec7215336342838fb343f4
Author: Jacky Lau <[email protected]>
AuthorDate: Wed Jan 14 05:29:49 2026 +0800

    [FLINK-36988][table] Migrate 
`LogicalCorrelateToJoinFromTemporalTableFunctionRule` to java
    
    ---------
    
    Co-authored-by: yongliu <[email protected]>
---
 ...rrelateToJoinFromTemporalTableFunctionRule.java | 357 +++++++++++++++++++++
 ...relateToJoinFromTemporalTableFunctionRule.scala | 238 --------------
 2 files changed, 357 insertions(+), 238 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.java
new file mode 100644
index 00000000000..d80062919e5
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TemporalTableFunction;
+import org.apache.flink.table.functions.TemporalTableFunctionImpl;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.functions.utils.TableSqlFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeContext;
+import org.apache.flink.table.planner.plan.utils.ExpandTableScanShuttle;
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.immutables.value.Value;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The initial temporal TableFunction join (LATERAL 
TemporalTableFunction(o.proctime)) is a
+ * correlate. Rewrite it into a Join with a special temporal join condition 
wraps time attribute and
+ * primary key information. The join will be translated into {@link 
StreamExecTemporalJoin} in
+ * physical.
+ */
[email protected]
+public class LogicalCorrelateToJoinFromTemporalTableFunctionRule
+        extends RelRule<
+                LogicalCorrelateToJoinFromTemporalTableFunctionRule
+                        
.LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig> {
+
+    public static final LogicalCorrelateToJoinFromTemporalTableFunctionRule 
INSTANCE =
+            LogicalCorrelateToJoinFromTemporalTableFunctionRule
+                    
.LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig.DEFAULT
+                    .toRule();
+
+    private LogicalCorrelateToJoinFromTemporalTableFunctionRule(
+            LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig config) {
+        super(config);
+    }
+
+    private String extractNameFromTimeAttribute(Expression timeAttribute) {
+        if (timeAttribute instanceof FieldReferenceExpression) {
+            FieldReferenceExpression f = (FieldReferenceExpression) 
timeAttribute;
+            if (f.getOutputDataType()
+                    .getLogicalType()
+                    .isAnyOf(
+                            LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                            LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
+                return f.getName();
+            }
+        }
+        throw new ValidationException(
+                "Invalid timeAttribute [" + timeAttribute + "] in 
TemporalTableFunction");
+    }
+
+    private boolean isProctimeReference(TemporalTableFunctionImpl 
temporalTableFunction) {
+        FieldReferenceExpression fieldRef =
+                (FieldReferenceExpression) 
temporalTableFunction.getTimeAttribute();
+        return 
isProctimeAttribute(fieldRef.getOutputDataType().getLogicalType());
+    }
+
+    private String extractNameFromPrimaryKeyAttribute(Expression expression) {
+        if (expression instanceof FieldReferenceExpression) {
+            FieldReferenceExpression f = (FieldReferenceExpression) expression;
+            return f.getName();
+        }
+        throw new ValidationException(
+                "Unsupported expression ["
+                        + expression
+                        + "] as primary key. "
+                        + "Only top-level (not nested) field references are 
supported.");
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalCorrelate logicalCorrelate = call.rel(0);
+        RelNode leftNode = call.rel(1);
+        TableFunctionScan rightTableFunctionScan = call.rel(2);
+
+        RelOptCluster cluster = logicalCorrelate.getCluster();
+
+        Optional<TemporalTableFunctionCall> temporalTableFunctionCall =
+                new GetTemporalTableFunctionCall(cluster.getRexBuilder(), 
leftNode)
+                        .visit(rightTableFunctionScan.getCall());
+
+        if (!(temporalTableFunctionCall.isPresent()
+                && temporalTableFunctionCall.get().getTemporalTableFunction()
+                        instanceof TemporalTableFunctionImpl)) {
+            return;
+        }
+
+        TemporalTableFunctionImpl rightTemporalTableFunction =
+                (TemporalTableFunctionImpl)
+                        
temporalTableFunctionCall.get().getTemporalTableFunction();
+        RexNode leftTimeAttribute = 
temporalTableFunctionCall.get().getTimeAttribute();
+
+        // If TemporalTableFunction was found, rewrite LogicalCorrelate to 
TemporalJoin
+        QueryOperation underlyingHistoryTable =
+                rightTemporalTableFunction.getUnderlyingHistoryTable();
+        RexBuilder rexBuilder = cluster.getRexBuilder();
+
+        FlinkOptimizeContext flinkContext =
+                (FlinkOptimizeContext) 
ShortcutUtils.unwrapContext(call.getPlanner());
+        FlinkRelBuilder relBuilder = flinkContext.getFlinkRelBuilder();
+
+        RelNode temporalTable = 
relBuilder.queryOperation(underlyingHistoryTable).build();
+        // expand QueryOperationCatalogViewTable in Table Scan
+        ExpandTableScanShuttle shuttle = new ExpandTableScanShuttle();
+        RelNode rightNode = temporalTable.accept(shuttle);
+
+        RexNode rightTimeIndicatorExpression =
+                createRightExpression(
+                        rexBuilder,
+                        leftNode,
+                        rightNode,
+                        extractNameFromTimeAttribute(
+                                
rightTemporalTableFunction.getTimeAttribute()));
+
+        RexNode rightPrimaryKeyExpression =
+                createRightExpression(
+                        rexBuilder,
+                        leftNode,
+                        rightNode,
+                        extractNameFromPrimaryKeyAttribute(
+                                rightTemporalTableFunction.getPrimaryKey()));
+
+        relBuilder.push(leftNode);
+        relBuilder.push(rightNode);
+
+        RexNode condition;
+        if (isProctimeReference(rightTemporalTableFunction)) {
+            condition =
+                    TemporalJoinUtil.makeProcTimeTemporalFunctionJoinConCall(
+                            rexBuilder, leftTimeAttribute, 
rightPrimaryKeyExpression);
+        } else {
+            condition =
+                    TemporalJoinUtil.makeRowTimeTemporalFunctionJoinConCall(
+                            rexBuilder,
+                            leftTimeAttribute,
+                            rightTimeIndicatorExpression,
+                            rightPrimaryKeyExpression);
+        }
+
+        relBuilder.join(JoinRelType.INNER, condition);
+        call.transformTo(relBuilder.build());
+    }
+
+    private RexNode createRightExpression(
+            RexBuilder rexBuilder, RelNode leftNode, RelNode rightNode, String 
field) {
+        int rightReferencesOffset = leftNode.getRowType().getFieldCount();
+        RelDataTypeField rightDataTypeField = 
rightNode.getRowType().getField(field, false, false);
+        return rexBuilder.makeInputRef(
+                rightDataTypeField.getType(),
+                rightReferencesOffset + rightDataTypeField.getIndex());
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig
+            extends RelRule.Config {
+        LogicalCorrelateToJoinFromTemporalTableFunctionRule
+                        
.LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig
+                DEFAULT =
+                        
ImmutableLogicalCorrelateToJoinFromTemporalTableFunctionRule
+                                
.LogicalCorrelateToJoinFromTemporalTableFunctionRuleConfig.builder()
+                                .build()
+                                .withOperandSupplier(
+                                        b0 ->
+                                                
b0.operand(LogicalCorrelate.class)
+                                                        .inputs(
+                                                                b1 ->
+                                                                        
b1.operand(RelNode.class)
+                                                                               
 .anyInputs(),
+                                                                b2 ->
+                                                                        
b2.operand(
+                                                                               
         TableFunctionScan
+                                                                               
                 .class)
+                                                                               
 .noInputs()))
+                                .withDescription(
+                                        
"LogicalCorrelateToJoinFromTemporalTableFunctionRule");
+
+        @Override
+        default LogicalCorrelateToJoinFromTemporalTableFunctionRule toRule() {
+            return new 
LogicalCorrelateToJoinFromTemporalTableFunctionRule(this);
+        }
+    }
+}
+
+/**
+ * Simple pojo class for extracted {@link TemporalTableFunction} with time 
attribute extracted from
+ * RexNode with {@link TemporalTableFunction} call.
+ */
+class TemporalTableFunctionCall {
+    private TemporalTableFunction temporalTableFunction;
+    private RexNode timeAttribute;
+
+    public TemporalTableFunctionCall(
+            TemporalTableFunction temporalTableFunction, RexNode 
timeAttribute) {
+        this.temporalTableFunction = temporalTableFunction;
+        this.timeAttribute = timeAttribute;
+    }
+
+    public TemporalTableFunction getTemporalTableFunction() {
+        return temporalTableFunction;
+    }
+
+    public RexNode getTimeAttribute() {
+        return timeAttribute;
+    }
+
+    public void setTimeAttribute(RexNode timeAttribute) {
+        this.timeAttribute = timeAttribute;
+    }
+}
+
+/**
+ * Find {@link TemporalTableFunction} call and run {@link 
CorrelatedFieldAccessRemoval} on it's
+ * operand.
+ */
+class GetTemporalTableFunctionCall extends 
RexVisitorImpl<TemporalTableFunctionCall> {
+    private final RexBuilder rexBuilder;
+    private final RelNode leftSide;
+
+    GetTemporalTableFunctionCall(RexBuilder rexBuilder, RelNode leftSide) {
+        super(false);
+        this.rexBuilder = rexBuilder;
+        this.leftSide = leftSide;
+    }
+
+    Optional<TemporalTableFunctionCall> visit(RexNode node) {
+        TemporalTableFunctionCall result = node.accept(this);
+        return result != null ? Optional.of(result) : Optional.empty();
+    }
+
+    @Override
+    public TemporalTableFunctionCall visitCall(RexCall rexCall) {
+        FunctionDefinition functionDefinition;
+        SqlOperator sqlOperator = rexCall.getOperator();
+        if (sqlOperator instanceof TableSqlFunction) {
+            functionDefinition = ((TableSqlFunction) sqlOperator).udtf();
+        } else if (sqlOperator instanceof BridgingSqlFunction) {
+            functionDefinition = ((BridgingSqlFunction) 
sqlOperator).getDefinition();
+        } else {
+            return null;
+        }
+
+        if (!(functionDefinition instanceof TemporalTableFunctionImpl)) {
+            return null;
+        }
+        TemporalTableFunctionImpl temporalTableFunction =
+                (TemporalTableFunctionImpl) functionDefinition;
+
+        checkState(
+                rexCall.getOperands().size() == 1,
+                "TemporalTableFunction call [%s] must have exactly one 
argument",
+                rexCall);
+        CorrelatedFieldAccessRemoval correlatedFieldAccessRemoval =
+                new CorrelatedFieldAccessRemoval(temporalTableFunction, 
rexBuilder, leftSide);
+        return new TemporalTableFunctionCall(
+                temporalTableFunction,
+                
rexCall.getOperands().get(0).accept(correlatedFieldAccessRemoval));
+    }
+}
+
+/**
+ * This converts field accesses like `$cor0.o_rowtime` to valid input 
references for join condition
+ * context without `$cor` reference.
+ */
+class CorrelatedFieldAccessRemoval extends RexDefaultVisitor<RexNode> {
+    private final TemporalTableFunctionImpl temporalTableFunction;
+    private final RexBuilder rexBuilder;
+    private final RelNode leftSide;
+
+    public CorrelatedFieldAccessRemoval(
+            TemporalTableFunctionImpl temporalTableFunction,
+            RexBuilder rexBuilder,
+            RelNode leftSide) {
+        this.temporalTableFunction = temporalTableFunction;
+        this.rexBuilder = rexBuilder;
+        this.leftSide = leftSide;
+    }
+
+    @Override
+    public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+        int leftIndex = 
leftSide.getRowType().getFieldList().indexOf(fieldAccess.getField());
+        if (leftIndex < 0) {
+            throw new IllegalStateException(
+                    "Failed to find reference to field ["
+                            + fieldAccess.getField()
+                            + "] in node ["
+                            + leftSide
+                            + "]");
+        }
+        return rexBuilder.makeInputRef(leftSide, leftIndex);
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+        return inputRef;
+    }
+
+    @Override
+    public RexNode visitNode(RexNode rexNode) {
+        throw new ValidationException(
+                "Unsupported argument ["
+                        + rexNode
+                        + "] "
+                        + "in "
+                        + TemporalTableFunction.class.getSimpleName()
+                        + " call of "
+                        + "["
+                        + temporalTableFunction.getUnderlyingHistoryTable()
+                        + "] table");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
deleted file mode 100644
index 79b09d5e427..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.{TemporalTableFunction, 
TemporalTableFunctionImpl}
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
-import org.apache.flink.table.planner.functions.utils.TableSqlFunction
-import 
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeContext
-import org.apache.flink.table.planner.plan.utils.{ExpandTableScanShuttle, 
RexDefaultVisitor}
-import 
org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{makeProcTimeTemporalFunctionJoinConCall,
 makeRowTimeTemporalFunctionJoinConCall}
-import org.apache.flink.table.planner.utils.ShortcutUtils
-import 
org.apache.flink.table.types.logical.LogicalTypeRoot.{TIMESTAMP_WITH_LOCAL_TIME_ZONE,
 TIMESTAMP_WITHOUT_TIME_ZONE}
-import 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute
-import org.apache.flink.util.Preconditions.checkState
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{JoinRelType, TableFunctionScan}
-import org.apache.calcite.rel.logical.LogicalCorrelate
-import org.apache.calcite.rex._
-
-/**
- * The initial temporal TableFunction join (LATERAL 
TemporalTableFunction(o.proctime)) is a
- * correlate. Rewrite it into a Join with a special temporal join condition 
wraps time attribute and
- * primary key information. The join will be translated into 
[[StreamExecTemporalJoin]] in physical.
- */
-class LogicalCorrelateToJoinFromTemporalTableFunctionRule
-  extends RelOptRule(
-    operand(
-      classOf[LogicalCorrelate],
-      some(operand(classOf[RelNode], any()), 
operand(classOf[TableFunctionScan], none()))),
-    "LogicalCorrelateToJoinFromTemporalTableFunctionRule") {
-
-  private def extractNameFromTimeAttribute(timeAttribute: Expression): String 
= {
-    timeAttribute match {
-      case f: FieldReferenceExpression
-          if f.getOutputDataType.getLogicalType.isAnyOf(
-            TIMESTAMP_WITHOUT_TIME_ZONE,
-            TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-        f.getName
-      case _ =>
-        throw new ValidationException(
-          s"Invalid timeAttribute [$timeAttribute] in TemporalTableFunction")
-    }
-  }
-
-  private def isProctimeReference(temporalTableFunction: 
TemporalTableFunctionImpl): Boolean = {
-    val fieldRef = 
temporalTableFunction.getTimeAttribute.asInstanceOf[FieldReferenceExpression]
-    isProctimeAttribute(fieldRef.getOutputDataType.getLogicalType)
-  }
-
-  private def extractNameFromPrimaryKeyAttribute(expression: Expression): 
String = {
-    expression match {
-      case f: FieldReferenceExpression =>
-        f.getName
-      case _ =>
-        throw new ValidationException(
-          s"Unsupported expression [$expression] as primary key. " +
-            s"Only top-level (not nested) field references are supported.")
-    }
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val logicalCorrelate: LogicalCorrelate = call.rel(0)
-    val leftNode: RelNode = call.rel(1)
-    val rightTableFunctionScan: TableFunctionScan = call.rel(2)
-
-    val cluster = logicalCorrelate.getCluster
-
-    new GetTemporalTableFunctionCall(cluster.getRexBuilder, leftNode)
-      .visit(rightTableFunctionScan.getCall) match {
-      case None =>
-      // Do nothing and handle standard TableFunction
-      case Some(
-            TemporalTableFunctionCall(
-              rightTemporalTableFunction: TemporalTableFunctionImpl,
-              leftTimeAttribute)) =>
-        // If TemporalTableFunction was found, rewrite LogicalCorrelate to 
TemporalJoin
-        val underlyingHistoryTable: QueryOperation =
-          rightTemporalTableFunction.getUnderlyingHistoryTable
-        val rexBuilder = cluster.getRexBuilder
-
-        val flinkContext = ShortcutUtils
-          .unwrapContext(call.getPlanner)
-          .asInstanceOf[FlinkOptimizeContext]
-        val relBuilder = flinkContext.getFlinkRelBuilder
-
-        val temporalTable: RelNode = 
relBuilder.queryOperation(underlyingHistoryTable).build()
-        // expand QueryOperationCatalogViewTable in Table Scan
-        val shuttle = new ExpandTableScanShuttle
-        val rightNode = temporalTable.accept(shuttle)
-
-        val rightTimeIndicatorExpression = createRightExpression(
-          rexBuilder,
-          leftNode,
-          rightNode,
-          
extractNameFromTimeAttribute(rightTemporalTableFunction.getTimeAttribute))
-
-        val rightPrimaryKeyExpression = createRightExpression(
-          rexBuilder,
-          leftNode,
-          rightNode,
-          
extractNameFromPrimaryKeyAttribute(rightTemporalTableFunction.getPrimaryKey))
-
-        relBuilder.push(leftNode)
-        relBuilder.push(rightNode)
-
-        val condition =
-          if (isProctimeReference(rightTemporalTableFunction)) {
-            makeProcTimeTemporalFunctionJoinConCall(
-              rexBuilder,
-              leftTimeAttribute,
-              rightPrimaryKeyExpression)
-          } else {
-            makeRowTimeTemporalFunctionJoinConCall(
-              rexBuilder,
-              leftTimeAttribute,
-              rightTimeIndicatorExpression,
-              rightPrimaryKeyExpression)
-          }
-        relBuilder.join(JoinRelType.INNER, condition)
-
-        call.transformTo(relBuilder.build())
-    }
-  }
-
-  private def createRightExpression(
-      rexBuilder: RexBuilder,
-      leftNode: RelNode,
-      rightNode: RelNode,
-      field: String): RexNode = {
-    val rightReferencesOffset = leftNode.getRowType.getFieldCount
-    val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
-    rexBuilder.makeInputRef(
-      rightDataTypeField.getType,
-      rightReferencesOffset + rightDataTypeField.getIndex)
-  }
-
-}
-
-object LogicalCorrelateToJoinFromTemporalTableFunctionRule {
-  val INSTANCE: RelOptRule = new 
LogicalCorrelateToJoinFromTemporalTableFunctionRule
-}
-
-/**
- * Simple pojo class for extracted [[TemporalTableFunction]] with time 
attribute extracted from
- * RexNode with [[TemporalTableFunction]] call.
- */
-case class TemporalTableFunctionCall(
-    var temporalTableFunction: TemporalTableFunction,
-    var timeAttribute: RexNode) {}
-
-/** Find [[TemporalTableFunction]] call and run 
[[CorrelatedFieldAccessRemoval]] on it's operand. */
-class GetTemporalTableFunctionCall(var rexBuilder: RexBuilder, var leftSide: 
RelNode)
-  extends RexVisitorImpl[TemporalTableFunctionCall](false) {
-
-  def visit(node: RexNode): Option[TemporalTableFunctionCall] = {
-    val result = node.accept(this)
-    if (result == null) {
-      return None
-    }
-    Some(result)
-  }
-
-  override def visitCall(rexCall: RexCall): TemporalTableFunctionCall = {
-    val functionDefinition = rexCall.getOperator match {
-      case tsf: TableSqlFunction => tsf.udtf
-      case bsf: BridgingSqlFunction => bsf.getDefinition
-      case _ => return null
-    }
-
-    if (!functionDefinition.isInstanceOf[TemporalTableFunction]) {
-      return null
-    }
-    val temporalTableFunction =
-      functionDefinition.asInstanceOf[TemporalTableFunctionImpl]
-
-    checkState(
-      rexCall.getOperands.size().equals(1),
-      "TemporalTableFunction call [%s] must have exactly one argument",
-      rexCall)
-    val correlatedFieldAccessRemoval =
-      new CorrelatedFieldAccessRemoval(temporalTableFunction, rexBuilder, 
leftSide)
-    TemporalTableFunctionCall(
-      temporalTableFunction,
-      rexCall.getOperands.get(0).accept(correlatedFieldAccessRemoval))
-  }
-}
-
-/**
- * This converts field accesses like `$cor0.o_rowtime` to valid input 
references for join condition
- * context without `$cor` reference.
- */
-class CorrelatedFieldAccessRemoval(
-    var temporalTableFunction: TemporalTableFunctionImpl,
-    var rexBuilder: RexBuilder,
-    var leftSide: RelNode)
-  extends RexDefaultVisitor[RexNode] {
-
-  override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
-    val leftIndex = 
leftSide.getRowType.getFieldList.indexOf(fieldAccess.getField)
-    if (leftIndex < 0) {
-      throw new IllegalStateException(
-        s"Failed to find reference to field [${fieldAccess.getField}] in node 
[$leftSide]")
-    }
-    rexBuilder.makeInputRef(leftSide, leftIndex)
-  }
-
-  override def visitInputRef(inputRef: RexInputRef): RexNode = {
-    inputRef
-  }
-
-  override def visitNode(rexNode: RexNode): RexNode = {
-    throw new ValidationException(
-      s"Unsupported argument [$rexNode] " +
-        s"in ${classOf[TemporalTableFunction].getSimpleName} call of " +
-        s"[${temporalTableFunction.getUnderlyingHistoryTable}] table")
-  }
-}

Reply via email to