JingsongLi commented on a change in pull request #18199:
URL: https://github.com/apache/flink/pull/18199#discussion_r776172010



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
##########
@@ -796,7 +796,9 @@ class FlinkRelMdHandlerTestBase {
       cluster,
       streamPhysicalTraits,
       streamExchange,
-      key)
+      key,

Review comment:
       We can new some testing objects.

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
##########
@@ -20,26 +20,32 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize
-import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils
-
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
 
 import java.util
+import javax.annotation.Nullable
 
 /**
  * Stream physical RelNode which normalizes a changelog stream which maybe an 
upsert stream or
  * a changelog stream containing duplicate events. This node normalize such 
stream into a regular
  * changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE 
records without
  * duplication.
+  *
+  * Note: tableIdentifier and table are not null if and only if it is 
generated directly by
+  * 
[[org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalTableSourceScanRule]]
  */
 class StreamPhysicalChangelogNormalize(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     input: RelNode,
-    val uniqueKeys: Array[Int])
+    val uniqueKeys: Array[Int],
+    @Nullable val tableIdentifier: ObjectIdentifier,

Review comment:
       Why nullable? It is only generated by `StreamPhysicalTableSourceScanRule`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -630,6 +628,24 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         }
 
       case normalize: StreamPhysicalChangelogNormalize =>
+        val tableIdentifier = normalize.tableIdentifier
+        if (tableIdentifier != null && requiredTrait == 
UpdateKindTrait.ONLY_UPDATE_AFTER) {
+          val catalogName = tableIdentifier.getCatalogName
+          val catalog = 
context.getCatalogManager.getCatalog(catalogName).orElse(null)
+          val catalogTable = normalize.table
+          if (ManagedTableListener.isManagedTable(catalog, catalogTable)) {
+            // if requiredTrait is ONLY_UPDATE_AFTER and table is ManagedTable,
+            // we can eliminate current normalize stage,
+            // cuz ManagedTable has preserved complete delete messages.
+            val input = 
if(normalize.getInput.isInstanceOf[StreamPhysicalExchange]) {
+              normalize.getInput.asInstanceOf[StreamPhysicalExchange].getInput

Review comment:
       Removing `StreamPhysicalExchange` is something dangerous, If there are 
two exchanges together, any problems?

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -45,14 +45,13 @@ import scala.collection.JavaConversions._
  */
 class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOptimizeContext] {
 
-  private val SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR = new 
SatisfyModifyKindSetTraitVisitor
-  private val SATISFY_UPDATE_KIND_TRAIT_VISITOR = new 
SatisfyUpdateKindTraitVisitor
-
   override def optimize(
       root: RelNode,
       context: StreamOptimizeContext): RelNode = {
     // step1: satisfy ModifyKindSet trait
     val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
+    val SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR = new 
SatisfyModifyKindSetTraitVisitor

Review comment:
       No field. Just `new SatisfyModifyKindSetTraitVisitor().visit` is ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to