JingsongLi commented on a change in pull request #18199:
URL: https://github.com/apache/flink/pull/18199#discussion_r776172010
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
##########
@@ -796,7 +796,9 @@ class FlinkRelMdHandlerTestBase {
cluster,
streamPhysicalTraits,
streamExchange,
- key)
+ key,
Review comment:
We can new some testing objects.
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala
##########
@@ -20,26 +20,32 @@ package
org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize
-import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils
-
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
import java.util
+import javax.annotation.Nullable
/**
* Stream physical RelNode which normalizes a changelog stream which maybe an
upsert stream or
* a changelog stream containing duplicate events. This node normalize such
stream into a regular
* changelog stream that contains INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE
records without
* duplication.
+ *
+ * Note: tableIdentifier and table are not null if and only if it is
generated directly by
+ *
[[org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalTableSourceScanRule]]
*/
class StreamPhysicalChangelogNormalize(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
- val uniqueKeys: Array[Int])
+ val uniqueKeys: Array[Int],
+ @Nullable val tableIdentifier: ObjectIdentifier,
Review comment:
Why nullable? It is only generated by `StreamPhysicalTableSourceScanRule`
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -630,6 +628,24 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
case normalize: StreamPhysicalChangelogNormalize =>
+ val tableIdentifier = normalize.tableIdentifier
+ if (tableIdentifier != null && requiredTrait ==
UpdateKindTrait.ONLY_UPDATE_AFTER) {
+ val catalogName = tableIdentifier.getCatalogName
+ val catalog =
context.getCatalogManager.getCatalog(catalogName).orElse(null)
+ val catalogTable = normalize.table
+ if (ManagedTableListener.isManagedTable(catalog, catalogTable)) {
+ // if requiredTrait is ONLY_UPDATE_AFTER and table is ManagedTable,
+ // we can eliminate current normalize stage,
+ // cuz ManagedTable has preserved complete delete messages.
+ val input =
if(normalize.getInput.isInstanceOf[StreamPhysicalExchange]) {
+ normalize.getInput.asInstanceOf[StreamPhysicalExchange].getInput
Review comment:
Removing `StreamPhysicalExchange` is something dangerous, If there are
two exchanges together, any problems?
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
##########
@@ -45,14 +45,13 @@ import scala.collection.JavaConversions._
*/
class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOptimizeContext] {
- private val SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR = new
SatisfyModifyKindSetTraitVisitor
- private val SATISFY_UPDATE_KIND_TRAIT_VISITOR = new
SatisfyUpdateKindTraitVisitor
-
override def optimize(
root: RelNode,
context: StreamOptimizeContext): RelNode = {
// step1: satisfy ModifyKindSet trait
val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
+ val SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR = new
SatisfyModifyKindSetTraitVisitor
Review comment:
No field. Just `new SatisfyModifyKindSetTraitVisitor().visit` is ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]