godfreyhe commented on code in PR #20262:
URL: https://github.com/apache/flink/pull/20262#discussion_r920056880


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/CommonSubGraphBasedOptimizer.scala:
##########
@@ -79,9 +82,23 @@ abstract class CommonSubGraphBasedOptimizer extends 
Optimizer {
         require(plan != null)
         plan
     }
-    expandIntermediateTableScan(optimizedPlan)
+    val expanded = expandIntermediateTableScan(optimizedPlan)
+
+    val postOptimizedPlan = postOptimize(expanded)
+
+    // Rewrite same rel object to different rel objects
+    // in order to get the correct dag (dag reuse is based on object not 
digest)
+    val shuttle = new SameRelObjectShuttle()
+    val relsWithoutSameObj = postOptimizedPlan.map(_.accept(shuttle))
+
+    // reuse subplan
+    val tableConfig = roots.head.getTable.unwrap(classOf[TableConfig])
+    SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
   }
 
+  /** Post process for the physical [[RelNode]] dag, e.g., validation or 
rewriting purpose. */
+  protected def postOptimize(expanded: Seq[RelNode]): Seq[RelNode]

Review Comment:
   nit: give the default implementation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to