snuyanzin commented on code in PR #27959:
URL: https://github.com/apache/flink/pull/27959#discussion_r3309353794
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/RelNodeBlock.scala:
##########
@@ -412,10 +412,26 @@ object RelNodeBlockPlanBuilder {
return relNodes
}
- // reuse sub-plan with same digest in input RelNode trees.
+ // The reuse lookup uses the original trees, while the rewrite runs on
normalized
+ // trees. This keeps existing reuse unchanged: normalization
+ // does not change subtrees without correlation variables, so they still
reuse as before. Subtrees with
+ // correlation variables (e.g., CROSS JOIN UNNEST or decorrelated
sub-queries)
+ // used to have different digests in each view expansion, so they were not
reused.
+ //
+ // Reusing those newly matching correlated subtrees is not safe yet. If
such a
+ // subtree is shared, it can become a separate RelNodeBlock and be
optimized without
+ // seeing its parent operators. During that local optimization, ROWTIME
output fields
+ // may be converted to regular TIMESTAMP_LTZ fields. The parents still
refer to the
+ // old ROWTIME-typed fields, so replacing the child can fail validation
with a
+ // ROWTIME/plain timestamp mismatch.
val context = new SubplanReuseContext(true, relNodes: _*)
val reuseShuttle = new SubplanReuseShuttle(context)
- relNodes.map(_.accept(reuseShuttle))
+
+ relNodes
+ // Normalize correlation variable ids per node so structurally equivalent
+ // subplans will share digests.
+ .map(n => n.accept(new
CorrelVariableNormalizerShuttle(n.getCluster.getRexBuilder)))
Review Comment:
do we need to create a shuttle per node here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]