This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 55dc9fa6922aacab3258159affe5867934771c6e
Author: liuyongvs <[email protected]>
AuthorDate: Tue Apr 9 15:23:22 2024 +0800

    [FLINK-35054][table] Migrate TemporalJoinRewriteWithUniqueKeyRule to java
    
    This closes #24635
---
 .../TemporalJoinRewriteWithUniqueKeyRule.java      | 304 +++++++++++++++++++++
 .../TemporalJoinRewriteWithUniqueKeyRule.scala     | 201 --------------
 2 files changed, 304 insertions(+), 201 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
new file mode 100644
index 00000000000..342cafd2526
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank;
+import 
org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule;
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+/**
+ * Planner rule that rewrites temporal join with extracted primary key, 
Event-time temporal table
+ * join requires primary key and row time attribute of versioned table. The 
versioned table could be
+ * a table source or a view only if it contains the unique key and time 
attribute.
+ *
+ * <p>Flink supports extract the primary key and row time attribute from the 
view if the view comes
+ * from {@link StreamPhysicalRank} node which can convert to a {@link 
StreamPhysicalDeduplicate}
+ * node.
+ */
[email protected]
+public class TemporalJoinRewriteWithUniqueKeyRule
+        extends RelRule<
+                
TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig>
+        implements CommonTemporalTableJoinRule {
+
+    public static final TemporalJoinRewriteWithUniqueKeyRule INSTANCE =
+            
TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig.DEFAULT
+                    .toRule();
+
+    private TemporalJoinRewriteWithUniqueKeyRule(
+            TemporalJoinRewriteWithUniqueKeyRuleConfig config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        FlinkLogicalSnapshot snapshot = call.rel(2);
+        FlinkLogicalRel snapshotInput = call.rel(3);
+
+        boolean isTemporalJoin = matches(snapshot);
+        boolean canConvertToLookup = canConvertToLookupJoin(snapshot, 
snapshotInput);
+        List<JoinRelType> supportedJoinTypes = 
Arrays.asList(JoinRelType.INNER, JoinRelType.LEFT);
+
+        return isTemporalJoin
+                && !canConvertToLookup
+                && supportedJoinTypes.contains(join.getJoinType());
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        FlinkLogicalJoin join = call.rel(0);
+        FlinkLogicalRel leftInput = call.rel(1);
+        FlinkLogicalSnapshot snapshot = call.rel(2);
+        FlinkLogicalRel snapshotInput = call.rel(3);
+
+        RexNode joinCondition = join.getCondition();
+        RexNode newJoinCondition =
+                joinCondition.accept(
+                        new RexShuttle() {
+                            @Override
+                            public RexNode visitCall(RexCall call) {
+                                if (call.getOperator()
+                                        .equals(
+                                                TemporalJoinUtil
+                                                        
.INITIAL_TEMPORAL_JOIN_CONDITION())) {
+                                    RexNode snapshotTimeInputRef;
+                                    List<RexNode> leftJoinKey;
+                                    List<RexNode> rightJoinKey;
+
+                                    if 
(TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
+                                        snapshotTimeInputRef = 
call.operands.get(0);
+                                        leftJoinKey =
+                                                ((RexCall) 
call.operands.get(2)).getOperands();
+                                        rightJoinKey =
+                                                ((RexCall) 
call.operands.get(3)).getOperands();
+                                    } else {
+                                        snapshotTimeInputRef = 
call.operands.get(0);
+                                        leftJoinKey =
+                                                ((RexCall) 
call.operands.get(1)).getOperands();
+                                        rightJoinKey =
+                                                ((RexCall) 
call.operands.get(2)).getOperands();
+                                    }
+
+                                    RexBuilder rexBuilder = 
join.getCluster().getRexBuilder();
+                                    Optional<List<RexNode>> 
primaryKeyInputRefs =
+                                            extractPrimaryKeyInputRefs(
+                                                    leftInput, snapshot, 
snapshotInput, rexBuilder);
+                                    validateRightPrimaryKey(
+                                            join, rightJoinKey, 
primaryKeyInputRefs);
+
+                                    if 
(TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
+                                        RexNode rightTimeInputRef = 
call.operands.get(1);
+                                        return 
TemporalJoinUtil.makeRowTimeTemporalTableJoinConCall(
+                                                rexBuilder,
+                                                snapshotTimeInputRef,
+                                                rightTimeInputRef,
+                                                
JavaConverters.asScalaBufferConverter(
+                                                                
primaryKeyInputRefs.get())
+                                                        .asScala(),
+                                                
JavaConverters.asScalaBufferConverter(leftJoinKey)
+                                                        .asScala(),
+                                                
JavaConverters.asScalaBufferConverter(rightJoinKey)
+                                                        .asScala());
+                                    } else {
+                                        return TemporalJoinUtil
+                                                
.makeProcTimeTemporalTableJoinConCall(
+                                                        rexBuilder,
+                                                        snapshotTimeInputRef,
+                                                        
JavaConverters.asScalaBufferConverter(
+                                                                        
primaryKeyInputRefs.get())
+                                                                .asScala(),
+                                                        
JavaConverters.asScalaBufferConverter(
+                                                                        
leftJoinKey)
+                                                                .asScala(),
+                                                        
JavaConverters.asScalaBufferConverter(
+                                                                        
rightJoinKey)
+                                                                .asScala());
+                                    }
+                                } else {
+                                    return super.visitCall(call);
+                                }
+                            }
+                        });
+        RelNode rewriteJoin =
+                FlinkLogicalJoin.create(
+                        leftInput, snapshot, newJoinCondition, 
join.getHints(), join.getJoinType());
+        call.transformTo(rewriteJoin);
+    }
+
+    private void validateRightPrimaryKey(
+            FlinkLogicalJoin join,
+            List<RexNode> rightJoinKeyExpressions,
+            Optional<List<RexNode>> rightPrimaryKeyInputRefs) {
+
+        if (!rightPrimaryKeyInputRefs.isPresent()) {
+            throw new ValidationException(
+                    "Temporal Table Join requires primary key in versioned 
table, "
+                            + "but no primary key can be found. "
+                            + "The physical plan is:\n"
+                            + RelOptUtil.toString(join)
+                            + "\n");
+        }
+
+        List<Integer> rightJoinKeyRefIndices =
+                rightJoinKeyExpressions.stream()
+                        .map(rex -> ((RexInputRef) rex).getIndex())
+                        .collect(Collectors.toList());
+
+        List<Integer> rightPrimaryKeyRefIndices =
+                rightPrimaryKeyInputRefs.get().stream()
+                        .map(rex -> ((RexInputRef) rex).getIndex())
+                        .collect(Collectors.toList());
+
+        boolean primaryKeyContainedInJoinKey =
+                rightPrimaryKeyRefIndices.stream()
+                        .allMatch(pk -> rightJoinKeyRefIndices.contains(pk));
+
+        if (!primaryKeyContainedInJoinKey) {
+            List<String> joinFieldNames = join.getRowType().getFieldNames();
+            List<String> joinLeftFieldNames = 
join.getLeft().getRowType().getFieldNames();
+            List<String> joinRightFieldNames = 
join.getRight().getRowType().getFieldNames();
+
+            String primaryKeyNames =
+                    rightPrimaryKeyRefIndices.stream()
+                            .map(i -> joinFieldNames.get(i))
+                            .collect(Collectors.joining(","));
+
+            String joinEquiInfo =
+                    join.analyzeCondition().pairs().stream()
+                            .map(
+                                    pair ->
+                                            joinLeftFieldNames.get(pair.source)
+                                                    + "="
+                                                    + 
joinRightFieldNames.get(pair.target))
+                            .collect(Collectors.joining(","));
+
+            throw new ValidationException(
+                    "Temporal table's primary key ["
+                            + primaryKeyNames
+                            + "] must be included in the "
+                            + "equivalence condition of temporal join, but 
current temporal join condition is ["
+                            + joinEquiInfo
+                            + "].");
+        }
+    }
+
+    private Optional<List<RexNode>> extractPrimaryKeyInputRefs(
+            RelNode leftInput,
+            FlinkLogicalSnapshot snapshot,
+            FlinkLogicalRel snapshotInput,
+            RexBuilder rexBuilder) {
+        List<RelDataTypeField> rightFields = 
snapshot.getRowType().getFieldList();
+        FlinkRelMetadataQuery fmq =
+                
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster().getMetadataQuery());
+
+        Set<ImmutableBitSet> upsertKeySet = fmq.getUpsertKeys(snapshotInput);
+        List<RelDataTypeField> fields = snapshot.getRowType().getFieldList();
+
+        if (upsertKeySet != null && !upsertKeySet.isEmpty()) {
+            int leftFieldCnt = leftInput.getRowType().getFieldCount();
+            List<List<RexNode>> upsertKeySetInputRefs =
+                    upsertKeySet.stream()
+                            .filter(bitSet -> !bitSet.isEmpty())
+                            .map(
+                                    bitSet ->
+                                            Arrays.stream(bitSet.toArray())
+                                                    .mapToObj(index -> 
fields.get(index))
+                                                    // build InputRef of 
upsert key in snapshot
+                                                    .map(
+                                                            f ->
+                                                                    (RexNode)
+                                                                            
rexBuilder.makeInputRef(
+                                                                               
     f.getType(),
+                                                                               
     leftFieldCnt
+                                                                               
             + rightFields
+                                                                               
                     .indexOf(
+                                                                               
                             f)))
+                                                    
.collect(Collectors.toList()))
+                            .collect(Collectors.toList());
+            // select shortest upsert key as primary key
+            return upsertKeySetInputRefs.stream()
+                    .sorted(Comparator.comparingInt(List::size))
+                    .findFirst();
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface TemporalJoinRewriteWithUniqueKeyRuleConfig extends 
RelRule.Config {
+        
TemporalJoinRewriteWithUniqueKeyRule.TemporalJoinRewriteWithUniqueKeyRuleConfig 
DEFAULT =
+                ImmutableTemporalJoinRewriteWithUniqueKeyRule
+                        .TemporalJoinRewriteWithUniqueKeyRuleConfig.builder()
+                        .build()
+                        .withOperandSupplier(
+                                b0 ->
+                                        b0.operand(FlinkLogicalJoin.class)
+                                                .inputs(
+                                                        b1 ->
+                                                                
b1.operand(FlinkLogicalRel.class)
+                                                                        
.anyInputs(),
+                                                        b2 ->
+                                                                b2.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 b3 ->
+                                                                               
         b3.operand(
+                                                                               
                         FlinkLogicalRel
+                                                                               
                                 .class)
+                                                                               
                 .anyInputs())))
+                        
.withDescription("TemporalJoinRewriteWithUniqueKeyRule");
+
+        @Override
+        default TemporalJoinRewriteWithUniqueKeyRule toRule() {
+            return new TemporalJoinRewriteWithUniqueKeyRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
deleted file mode 100644
index 74a17e5f55e..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/TemporalJoinRewriteWithUniqueKeyRule.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, 
FlinkLogicalRel, FlinkLogicalSnapshot}
-import 
org.apache.flink.table.planner.plan.rules.common.CommonTemporalTableJoinRule
-import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConversions._
-
-/**
- * Planner rule that rewrites temporal join with extracted primary key, 
Event-time temporal table
- * join requires primary key and row time attribute of versioned table. The 
versioned table could be
- * a table source or a view only if it contains the unique key and time 
attribute.
- *
- * <p> Flink supports extract the primary key and row time attribute from the 
view if the view comes
- * from [[LogicalRank]] node which can convert to a [[Deduplicate]] node.
- */
-class TemporalJoinRewriteWithUniqueKeyRule
-  extends RelOptRule(
-    operand(
-      classOf[FlinkLogicalJoin],
-      operand(classOf[FlinkLogicalRel], any()),
-      operand(classOf[FlinkLogicalSnapshot], operand(classOf[FlinkLogicalRel], 
any()))),
-    "TemporalJoinRewriteWithUniqueKeyRule")
-  with CommonTemporalTableJoinRule {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val join = call.rel[FlinkLogicalJoin](0)
-    val snapshot = call.rel[FlinkLogicalSnapshot](2)
-    val snapshotInput = call.rel[FlinkLogicalRel](3)
-
-    val isTemporalJoin = matches(snapshot)
-    val canConvertToLookup = canConvertToLookupJoin(snapshot, snapshotInput)
-    val supportedJoinTypes = Seq(JoinRelType.INNER, JoinRelType.LEFT)
-
-    isTemporalJoin && !canConvertToLookup && 
supportedJoinTypes.contains(join.getJoinType)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val join = call.rel[FlinkLogicalJoin](0)
-    val leftInput = call.rel[FlinkLogicalRel](1)
-    val snapshot = call.rel[FlinkLogicalSnapshot](2)
-    val snapshotInput = call.rel[FlinkLogicalRel](3)
-
-    val joinCondition = join.getCondition
-
-    val newJoinCondition = joinCondition.accept(new RexShuttle {
-      override def visitCall(call: RexCall): RexNode = {
-        if (call.getOperator == 
TemporalJoinUtil.INITIAL_TEMPORAL_JOIN_CONDITION) {
-          val (snapshotTimeInputRef, leftJoinKey, rightJoinKey) =
-            if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
-              val snapshotTimeInputRef = call.operands(0)
-              val leftJoinKey = call.operands(2).asInstanceOf[RexCall].operands
-              val rightJoinKey = 
call.operands(3).asInstanceOf[RexCall].operands
-              (snapshotTimeInputRef, leftJoinKey, rightJoinKey)
-            } else {
-              val snapshotTimeInputRef = call.operands(0)
-              val leftJoinKey = call.operands(1).asInstanceOf[RexCall].operands
-              val rightJoinKey = 
call.operands(2).asInstanceOf[RexCall].operands
-              (snapshotTimeInputRef, leftJoinKey, rightJoinKey)
-            }
-
-          val rexBuilder = join.getCluster.getRexBuilder
-          val primaryKeyInputRefs =
-            extractPrimaryKeyInputRefs(leftInput, snapshot, snapshotInput, 
rexBuilder)
-          validateRightPrimaryKey(join, rightJoinKey, primaryKeyInputRefs)
-
-          if (TemporalJoinUtil.isInitialRowTimeTemporalTableJoin(call)) {
-            val rightTimeInputRef = call.operands(1)
-            TemporalJoinUtil.makeRowTimeTemporalTableJoinConCall(
-              rexBuilder,
-              snapshotTimeInputRef,
-              rightTimeInputRef,
-              primaryKeyInputRefs.get,
-              leftJoinKey,
-              rightJoinKey)
-          } else {
-            TemporalJoinUtil.makeProcTimeTemporalTableJoinConCall(
-              rexBuilder,
-              snapshotTimeInputRef,
-              primaryKeyInputRefs.get,
-              leftJoinKey,
-              rightJoinKey)
-          }
-        } else {
-          super.visitCall(call)
-        }
-      }
-    })
-    val rewriteJoin =
-      FlinkLogicalJoin.create(
-        leftInput,
-        snapshot,
-        newJoinCondition,
-        join.getHints,
-        join.getJoinType)
-    call.transformTo(rewriteJoin)
-  }
-
-  private def validateRightPrimaryKey(
-      join: FlinkLogicalJoin,
-      rightJoinKeyExpression: Seq[RexNode],
-      rightPrimaryKeyInputRefs: Option[Seq[RexNode]]): Unit = {
-
-    if (rightPrimaryKeyInputRefs.isEmpty) {
-      throw new ValidationException(
-        "Temporal Table Join requires primary key in versioned table, " +
-          s"but no primary key can be found. " +
-          s"The physical plan is:\n${RelOptUtil.toString(join)}\n")
-    }
-
-    val rightJoinKeyRefIndices = rightJoinKeyExpression
-      .map(rex => rex.asInstanceOf[RexInputRef].getIndex)
-      .toArray
-
-    val rightPrimaryKeyRefIndices = rightPrimaryKeyInputRefs.get
-      .map(rex => rex.asInstanceOf[RexInputRef].getIndex)
-      .toArray
-
-    val primaryKeyContainedInJoinKey = rightPrimaryKeyRefIndices
-      .forall(pk => rightJoinKeyRefIndices.contains(pk))
-
-    if (!primaryKeyContainedInJoinKey) {
-      val joinFieldNames = join.getRowType.getFieldNames
-      val joinLeftFieldNames = join.getLeft.getRowType.getFieldNames
-      val joinRightFieldNamess = join.getRight.getRowType.getFieldNames
-      val primaryKeyNames = rightPrimaryKeyRefIndices
-        .map(i => joinFieldNames.get(i))
-        .toList
-        .mkString(",")
-      val joinEquiInfo = join.analyzeCondition
-        .pairs()
-        .map {
-          pair => joinLeftFieldNames.get(pair.source) + "=" + 
joinRightFieldNamess.get(pair.target)
-        }
-        .toList
-        .mkString(",")
-      throw new ValidationException(
-        s"Temporal table's primary key [$primaryKeyNames] must be included in 
the equivalence " +
-          s"condition of temporal join, but current temporal join condition is 
[$joinEquiInfo].")
-    }
-  }
-
-  private def extractPrimaryKeyInputRefs(
-      leftInput: RelNode,
-      snapshot: FlinkLogicalSnapshot,
-      snapshotInput: FlinkLogicalRel,
-      rexBuilder: RexBuilder): Option[Seq[RexNode]] = {
-    val rightFields = snapshot.getRowType.getFieldList
-    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(snapshot.getCluster.getMetadataQuery)
-
-    val upsertKeySet = fmq.getUpsertKeys(snapshotInput)
-    val fields = snapshot.getRowType.getFieldList
-
-    if (upsertKeySet != null && upsertKeySet.size() > 0) {
-      val leftFieldCnt = leftInput.getRowType.getFieldCount
-      val upsertKeySetInputRefs = upsertKeySet
-        .filter(_.nonEmpty)
-        .map(
-          _.toArray
-            .map(fields)
-            // build InputRef of upsert key in snapshot
-            .map(f => rexBuilder.makeInputRef(f.getType, leftFieldCnt + 
rightFields.indexOf(f)))
-            .toSeq)
-      // select shortest upsert key as primary key
-      upsertKeySetInputRefs.toArray
-        .sortBy(_.length)
-        .headOption
-    } else {
-      None
-    }
-  }
-}
-
-object TemporalJoinRewriteWithUniqueKeyRule {
-  val INSTANCE: TemporalJoinRewriteWithUniqueKeyRule = new 
TemporalJoinRewriteWithUniqueKeyRule
-}

Reply via email to