Dennis-Mircea commented on PR #28091:
URL: https://github.com/apache/flink/pull/28091#issuecomment-4469701268

   > featzhang
   
   Thanks for the thorough review @featzhang, appreciate the detail. I went 
through each item against the rule code and wanted to share where I landed and 
what's already pushed.
   
   ### Risk 1: Checkpoint recovery / operator UIDs
   
   This is a real concern but it is not specific to this rule. In the Table 
API, operator UIDs are derived from `ExecNode` IDs assigned at translation 
time, so any planner-level rewrite shifts UIDs on a non-compiled-plan savepoint 
restore. `MiniBatchIntervalInferRule`, sub-query removal, join reorder etc. 
behave the same way. Compiled plans freeze the topology at compile time, so 
this rule is invisible to restore there.
   
   A `testOperatorUidStability` comparing before-rule vs after-rule would fail 
by definition (the topology *does* change). What I did add:
   
   1. A new `table.optimizer.redundant-watermark-assigner-remove.enabled` flag 
(default `true`, streaming-only) that lets users disable the rule as a rollback 
escape hatch. This matches the precedent of 
`table.optimizer.multi-join.enabled`, `incremental-agg-enabled`, etc.
   2. A release-notes entry in `flink-2.3.md` calling out the non-compiled-plan 
savepoint impact and pointing at the new option / `--allowNonRestoredState`.
   3. A user-facing note in the `WATERMARK` section of the `CREATE TABLE` docs 
explaining the rewrite and the same savepoint caveat.
   
   ### Risk 2: Edge cases
   
   | # | Claim | Status |
   |---|---|---|
   | 1 | Nested `CURRENT_WATERMARK` inside `CASE` is missed | Already handled. 
`containsCurrentWatermarkCall` calls `super.visitCall(call)`, which is the 
standard recursive `RexShuttle` contract that visits all operands. Added a 
defensive test (`testCurrentWatermarkNestedInCaseKeepsAssigner`) to lock the 
behavior in. |
   | 2 | `TemporalSort ORDER BY a, rt` (rowtime as 2nd key) | Cannot occur. 
`StreamPhysicalTemporalSortRule.canConvertToTemporalSort` only converts a 
`FlinkLogicalSort` to `StreamPhysicalTemporalSort` when the *first* sort key is 
a time-indicator (file: `StreamPhysicalTemporalSortRule.scala:79`). If rowtime 
isn't first, it stays a regular Sort, which doesn't consume watermarks. So 
`isRowtimeTemporalSort` checking only the first field is correct by 
construction. |
   | 3 | UDF reading watermark via `getRuntimeContext().getCurrentWatermark()` 
| Statically undetectable, and not a documented SQL pattern 
(`CURRENT_WATERMARK(rt)` is the supported way). Added a "Known limitations" 
entry in the rule's Javadoc pointing users at `CURRENT_WATERMARK` or the new 
disable flag. |
   | 4 | Union with mismatched column counts | This is rejected by Calcite's 
validator at parse time. `SELECT a,b,rt UNION ALL SELECT a,b,c,rt` is not legal 
SQL, so the rule never sees such a plan. |
   | 5 | `catch (Exception ignored)` is silent | Fair point. Current behavior 
was already conservative (returns `true`, keeps assigner) so it was safe, but a 
logged warning helps debug a misbehaving `RelNode`. Replaced with 
`LOG.warn(...)` that names the rel type and includes the throwable. |
   
   ### Risk 3: HEP traversal overhead
   
   The rule fires once per HEP root (guarded by `isHepRoot`), so each 
optimization sees `O(N)` walks, not per-pass. The `mapProtectedDown` 
short-circuits on `Aggregate` and on empty protected sets keep the constant 
factor small. I'd rather not add a synthetic-DAG perf benchmark unless we see 
an actual regression in CI; that test would be performance theater.
   
   ### Your questions
   
   1. **UID semantics**: position/topology-derived for non-compiled plans, 
JSON-stable for compiled plans. Covered in the release-notes entry.
   2. **TemporalSort with non-first rowtime**: cannot happen, see Risk 2.2.
   3. **Config flag escape hatch**: done. 
`table.optimizer.redundant-watermark-assigner-remove.enabled`.
   
   ### Summary of changes pushed
   
   * New config option 
`TABLE_OPTIMIZER_REDUNDANT_WATERMARK_ASSIGNER_REMOVE_ENABLED` in 
`OptimizerConfigOptions.java` (default `true`).
   * Rule application in `FlinkStreamProgram.scala` gated on the flag.
   * `LOG.warn(...)` on `RexShuttle` scan failure in 
`RedundantWatermarkAssignerRemoveRule.containsCurrentWatermarkCall`.
   * Javadoc "Known limitations" extended with the UDF-runtime-context caveat.
   * New test `testCurrentWatermarkNestedInCaseKeepsAssigner` plus its golden 
plan in `RedundantWatermarkAssignerRemoveRuleTest.xml`.
   * Release-notes entry in `docs/content/release-notes/flink-2.3.md`.
   * User-facing note in the `WATERMARK` section of 
`docs/content/docs/sql/reference/ddl/create.md`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to