godfreyhe commented on code in PR #20393:
URL: https://github.com/apache/flink/pull/20393#discussion_r935451881


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
       case _ => // ignore
     }
 
+    if (withUpsertKey) rel match {
+      case streamRel: StreamPhysicalRel =>
+        val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(rel.getCluster.getMetadataQuery)
+        val upsertKeys = fmq.getUpsertKeys(streamRel)
+        if (null != upsertKeys && !upsertKeys.isEmpty) {
+          printValues.add(
+            Pair.of("upsertKeys", upsertKeys.map(bitset => 
bitset.toString).mkString(", ")))

Review Comment:
   fields name is better than index



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
       case _ => // ignore
     }
 
+    if (withUpsertKey) rel match {

Review Comment:
   please add some tests in `FlinkRelOptUtilTest`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala:
##########
@@ -80,7 +81,8 @@ object FlinkRelOptUtil {
       withIdPrefix,
       withChangelogTraits,
       withRowType,
-      withTreeStyle = true)
+      withTreeStyle = true,
+      withUpsertKey)

Review Comment:
   nit: move `withUpsertKey` before `withTreeStyle `



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala:
##########
@@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate(
     inputRel: RelNode,
     val partialAggGrouping: Array[Int],
     val partialAggCalls: Array[AggregateCall],
-    val finalAggGrouping: Array[Int],

Review Comment:
   I think `finalAggGrouping` is clearer than `grouping`, because there are two 
kind of grouping



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {

Review Comment:
   nit: temporalPkIdxs.nonEmpty



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+      if (calcOnTemporalTable.isDefined) {
+        val program = calcOnTemporalTable.get
+        val outputMapping = program.getProjectList.asScala.zipWithIndex
+          .map { case (ref, index) => (index, program.expandLocalRef(ref)) }
+          .map {
+            case (outIndex, ref) =>
+              ref match {
+                case inputRef: RexInputRef => (inputRef.getIndex, outIndex)
+                case _ => (-1, -1)
+              }
+          }
+          .toMap
+        val outputPk = temporalPkIdxs.forall(outputMapping.contains)
+        if (outputPk) {
+          // remapping pk index
+          temporalPkIdxs.map(outputMapping.get(_).get)
+        } else {
+          Array[Int]()
+        }
+      } else {
+        temporalPkIdxs
+      }
+    } else {
+      // temporal table has no pk, no uk produces
+      Array[Int]()
+    }
+
+    outputPkIdx
+  }
+
+  private def getPrimaryKeyIndexesOfTemporalTable: Array[Int] = {
+    // get primary key columns of lookup table if exists
+    val pkColumns = getPrimaryKeyColumnsOfTemporalTable
+    if (pkColumns.isDefined) {
+      val newSchema = temporalTable.getRowType.getFieldNames
+      pkColumns.get.toArray().map(col => newSchema.indexOf(col))
+    } else {
+      Array[Int]()
+    }
+  }
+
+  private def getPrimaryKeyColumnsOfTemporalTable: Option[util.List[String]] = 
{
+    val schema = temporalTable match {
+      case t: TableSourceTable =>
+        
TableSchema.fromResolvedSchema(t.contextResolvedTable.getResolvedSchema)

Review Comment:
   TableSchema is a deprecated class, it's better we could avoid use it



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+      if (calcOnTemporalTable.isDefined) {

Review Comment:
   two many if-else here, we can use `match` to avoid it



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+      if (calcOnTemporalTable.isDefined) {
+        val program = calcOnTemporalTable.get
+        val outputMapping = program.getProjectList.asScala.zipWithIndex
+          .map { case (ref, index) => (index, program.expandLocalRef(ref)) }
+          .map {
+            case (outIndex, ref) =>
+              ref match {
+                case inputRef: RexInputRef => (inputRef.getIndex, outIndex)
+                case _ => (-1, -1)
+              }
+          }
+          .toMap
+        val outputPk = temporalPkIdxs.forall(outputMapping.contains)
+        if (outputPk) {
+          // remapping pk index
+          temporalPkIdxs.map(outputMapping.get(_).get)

Review Comment:
   outputMapping.get(_).get => outputMapping(_)



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
     val left = join.getInput
     val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
-    val leftType = left.getRowType
-    getJoinUniqueKeys(
-      join.joinType,
-      leftType,
-      leftUniqueKeys,
-      null,
-      mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
-      // TODO get uniqueKeys from TableSchema of TableSource
+
+    if (leftUniqueKeys != null) {
+      val rightUniqueKeys = getUniqueKeysOfTemporalTable(join)
+
+      getJoinUniqueKeys(
+        join.joinType,
+        left.getRowType,
+        leftUniqueKeys,
+        rightUniqueKeys,
+        mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
+        rightUniqueKeys != null)
+    } else {
       null
-    )
+    }
+  }
+
+  private[flink] def getUniqueKeysOfTemporalTable(

Review Comment:
   could this method and CommonPhysicalLookupJoin#lookupKeyContainsPrimaryKey 
be combined into one ?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       join: CommonPhysicalLookupJoin,
       mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
     val left = join.getInput
-    val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
     val leftType = left.getRowType
     val leftJoinKeys = join.joinInfo.leftSet
+    // differs from regular join, here we do not filterKeys because there's no 
shuffle on join keys
+    // by default.
+    val leftUpsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+    val rightUniqueKeys = 
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+    val rightUpsertKeys =
+      if (
+        (join.remainingCondition.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(
+          join.remainingCondition.get))
+        || (join.calcOnTemporalTable.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(

Review Comment:
   `isDeterministicInStreaming` is introduce in the next pr, this commit is 
invalid.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       join: CommonPhysicalLookupJoin,
       mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
     val left = join.getInput
-    val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
     val leftType = left.getRowType
     val leftJoinKeys = join.joinInfo.leftSet
+    // differs from regular join, here we do not filterKeys because there's no 
shuffle on join keys
+    // by default.
+    val leftUpsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+    val rightUniqueKeys = 
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+    val rightUpsertKeys =
+      if (
+        (join.remainingCondition.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(

Review Comment:
   The if condition is too long, assign the result to a variable, and the 
if-else will become clearer



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {

Review Comment:
   getOutputPrimaryKeyIndexesOfTemporalTable



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       join: CommonPhysicalLookupJoin,
       mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
     val left = join.getInput
-    val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
     val leftType = left.getRowType
     val leftJoinKeys = join.joinInfo.leftSet
+    // differs from regular join, here we do not filterKeys because there's no 
shuffle on join keys
+    // by default.
+    val leftUpsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+    val rightUniqueKeys = 
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+    val rightUpsertKeys =
+      if (
+        (join.remainingCondition.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(

Review Comment:
   join.remainingCondition.exists(c => 
!FlinkRexUtil.isDeterministicInStreaming(c))



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
     val left = join.getInput
     val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
-    val leftType = left.getRowType
-    getJoinUniqueKeys(
-      join.joinType,
-      leftType,
-      leftUniqueKeys,
-      null,
-      mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
-      // TODO get uniqueKeys from TableSchema of TableSource
+
+    if (leftUniqueKeys != null) {

Review Comment:
   It's better we can add tests for cover the changes in the commit



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -142,4 +142,33 @@ public class OptimizerConfigOptions {
                     .withDescription(
                             "When it is true, the optimizer will merge the 
operators with pipelined shuffling "
                                     + "into a multiple input operator to 
reduce shuffling and improve performance. Default value is true.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<NonDeterministicUpdateHandling>
+            TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_HANDLING =
+                    key("table.optimizer.non-deterministic-update.handling")

Review Comment:
   how about `table.optimizer.non-deterministic-update-strategy` ?
   
   if this config option only used for lookup join, the config name should 
contain lookup-join keyword



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -585,6 +585,50 @@ object FlinkRexUtil {
     }
     false
   }
+
+  /**
+   * Returns whether a given expression is deterministic in streaming 
scenario, differs from
+   * calcite's [[RexUtil]], it considers both non-deterministic and dynamic 
functions.
+   */
+  def isDeterministicInStreaming(e: RexNode): Boolean = try {
+    val visitor = new RexVisitorImpl[Void](true) {
+      override def visitCall(call: RexCall): Void = {
+        // dynamic function call is also non-deterministic to streaming
+        if (!call.getOperator.isDeterministic || 
call.getOperator.isDynamicFunction)

Review Comment:
   add {}



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala:
##########
@@ -80,4 +81,14 @@ abstract class CommonPhysicalJoin(
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
+  def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {

Review Comment:
   getUpsertKeys ?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -585,6 +585,50 @@ object FlinkRexUtil {
     }
     false
   }
+
+  /**
+   * Returns whether a given expression is deterministic in streaming 
scenario, differs from
+   * calcite's [[RexUtil]], it considers both non-deterministic and dynamic 
functions.
+   */
+  def isDeterministicInStreaming(e: RexNode): Boolean = try {

Review Comment:
   Why we emphasize `streaming` ?
   
   we should add test cases for this util method



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -585,6 +585,50 @@ object FlinkRexUtil {
     }
     false
   }
+
+  /**
+   * Returns whether a given expression is deterministic in streaming 
scenario, differs from
+   * calcite's [[RexUtil]], it considers both non-deterministic and dynamic 
functions.
+   */
+  def isDeterministicInStreaming(e: RexNode): Boolean = try {
+    val visitor = new RexVisitorImpl[Void](true) {
+      override def visitCall(call: RexCall): Void = {
+        // dynamic function call is also non-deterministic to streaming
+        if (!call.getOperator.isDeterministic || 
call.getOperator.isDynamicFunction)
+          throw Util.FoundOne.NULL
+        super.visitCall(call)
+      }
+    }
+    e.accept(visitor)
+    true
+  } catch {
+    case ex: Util.FoundOne =>
+      Util.swallow(ex, null)
+      false
+  }
+
+  /**
+   * Returns whether a given [[RexProgram]] is deterministic in streaming 
scenario, differs from
+   * calcite's [[RexUtil]], it considers both non-deterministic and dynamic 
functions.
+   * @return
+   *   true if any expression of the program is not deterministic in streaming
+   */
+  def isDeterministicInStreaming(rexProgram: RexProgram): Boolean = try {
+    if (null != rexProgram.getCondition) {
+      val rexCondi = rexProgram.expandLocalRef(rexProgram.getCondition)
+      if (!isDeterministicInStreaming(rexCondi)) {
+        return false
+      }
+    }
+    val projects = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
+    projects.forall {
+      expr =>
+        expr match {
+          case rexNode: RexNode => isDeterministicInStreaming(rexNode)
+          case _ => true // ignore
+        }

Review Comment:
   these line can be simplified as `projects.forall(isDeterministicInStreaming)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to