Dennis-Mircea commented on PR #28091: URL: https://github.com/apache/flink/pull/28091#issuecomment-4469701268
> featzhang Thanks for the thorough review @featzhang, appreciate the detail. I went through each item against the rule code and wanted to share where I landed and what's already pushed. ### Risk 1: Checkpoint recovery / operator UIDs This is a real concern but it is not specific to this rule. In the Table API, operator UIDs are derived from `ExecNode` IDs assigned at translation time, so any planner-level rewrite shifts UIDs on a non-compiled-plan savepoint restore. `MiniBatchIntervalInferRule`, sub-query removal, join reorder etc. behave the same way. Compiled plans freeze the topology at compile time, so this rule is invisible to restore there. A `testOperatorUidStability` comparing before-rule vs after-rule would fail by definition (the topology *does* change). What I did add: 1. A new `table.optimizer.redundant-watermark-assigner-remove.enabled` flag (default `true`, streaming-only) that lets users disable the rule as a rollback escape hatch. This matches the precedent of `table.optimizer.multi-join.enabled`, `incremental-agg-enabled`, etc. 2. A release-notes entry in `flink-2.3.md` calling out the non-compiled-plan savepoint impact and pointing at the new option / `--allowNonRestoredState`. 3. A user-facing note in the `WATERMARK` section of the `CREATE TABLE` docs explaining the rewrite and the same savepoint caveat. ### Risk 2: Edge cases | # | Claim | Status | |---|---|---| | 1 | Nested `CURRENT_WATERMARK` inside `CASE` is missed | Already handled. `containsCurrentWatermarkCall` calls `super.visitCall(call)`, which is the standard recursive `RexShuttle` contract that visits all operands. Added a defensive test (`testCurrentWatermarkNestedInCaseKeepsAssigner`) to lock the behavior in. | | 2 | `TemporalSort ORDER BY a, rt` (rowtime as 2nd key) | Cannot occur. `StreamPhysicalTemporalSortRule.canConvertToTemporalSort` only converts a `FlinkLogicalSort` to `StreamPhysicalTemporalSort` when the *first* sort key is a time-indicator (file: `StreamPhysicalTemporalSortRule.scala:79`). If rowtime isn't first, it stays a regular Sort, which doesn't consume watermarks. So `isRowtimeTemporalSort` checking only the first field is correct by construction. | | 3 | UDF reading watermark via `getRuntimeContext().getCurrentWatermark()` | Statically undetectable, and not a documented SQL pattern (`CURRENT_WATERMARK(rt)` is the supported way). Added a "Known limitations" entry in the rule's Javadoc pointing users at `CURRENT_WATERMARK` or the new disable flag. | | 4 | Union with mismatched column counts | This is rejected by Calcite's validator at parse time. `SELECT a,b,rt UNION ALL SELECT a,b,c,rt` is not legal SQL, so the rule never sees such a plan. | | 5 | `catch (Exception ignored)` is silent | Fair point. Current behavior was already conservative (returns `true`, keeps assigner) so it was safe, but a logged warning helps debug a misbehaving `RelNode`. Replaced with `LOG.warn(...)` that names the rel type and includes the throwable. | ### Risk 3: HEP traversal overhead The rule fires once per HEP root (guarded by `isHepRoot`), so each optimization sees `O(N)` walks, not per-pass. The `mapProtectedDown` short-circuits on `Aggregate` and on empty protected sets keep the constant factor small. I'd rather not add a synthetic-DAG perf benchmark unless we see an actual regression in CI; that test would be performance theater. ### Your questions 1. **UID semantics**: position/topology-derived for non-compiled plans, JSON-stable for compiled plans. Covered in the release-notes entry. 2. **TemporalSort with non-first rowtime**: cannot happen, see Risk 2.2. 3. **Config flag escape hatch**: done. `table.optimizer.redundant-watermark-assigner-remove.enabled`. ### Summary of changes pushed * New config option `TABLE_OPTIMIZER_REDUNDANT_WATERMARK_ASSIGNER_REMOVE_ENABLED` in `OptimizerConfigOptions.java` (default `true`). * Rule application in `FlinkStreamProgram.scala` gated on the flag. * `LOG.warn(...)` on `RexShuttle` scan failure in `RedundantWatermarkAssignerRemoveRule.containsCurrentWatermarkCall`. * Javadoc "Known limitations" extended with the UDF-runtime-context caveat. * New test `testCurrentWatermarkNestedInCaseKeepsAssigner` plus its golden plan in `RedundantWatermarkAssignerRemoveRuleTest.xml`. * Release-notes entry in `docs/content/release-notes/flink-2.3.md`. * User-facing note in the `WATERMARK` section of `docs/content/docs/sql/reference/ddl/create.md`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
