featzhang created FLINK-39093:
---------------------------------
Summary: [SQL] Display ChangelogMode in Execution Plan for
Streaming Queries
Key: FLINK-39093
URL: https://issues.apache.org/jira/browse/FLINK-39093
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: featzhang
Currently, Flink's execution plan (ExecNode level) does not display the
ChangelogMode information for streaming queries. This makes it difficult for
users to understand:
- Whether the stream is append-only
- Whether it contains retractions (UPDATE_BEFORE, UPDATE_AFTER, DELETE)
- The exact changelog semantics of each operator
While the physical plan (StreamPhysicalRel) already supports displaying
ChangelogMode via ExplainDetail.CHANGELOG_MODE, the execution plan (ExecNode)
does not expose this information.
This enhancement proposes to add ChangelogMode display to the execution plan
output, making it more transparent for users to understand the streaming
semantics.
Motivation:
Many users are confused about:
1. Whether their query produces append-only results or retract streams
2. Which operators introduce retractions
3. The changelog semantics of intermediate results
This information is critical for:
- Understanding query behavior
- Performance tuning
- Debugging unexpected results
- Choosing appropriate sink connectors
Proposed Changes:
1. Add a method to ExecNode interface or StreamExecNode to retrieve
ChangelogMode
2. Modify ExecNodePlanDumper to display ChangelogMode information
3. Format: Display as "changelogMode=[I,UB,UA,D]" where:
- I = INSERT
- UB = UPDATE_BEFORE
- UA = UPDATE_AFTER
- D = DELETE
Example Output:
Before:
```
Calc(select=[a, b])
+- TableSourceScan(table=[[my_table]])
```
After:
```
Calc(select=[a, b], changelogMode=[I,UB,UA])
+- TableSourceScan(table=[[my_table]], changelogMode=[I,UB,UA,D])
```
Implementation Location:
- ExecNode.java or StreamExecNode.java: Add getChangelogMode() method
- ExecNodePlanDumper.java: Modify to display ChangelogMode in tree/DAG output
- ExecNodeBase.java: Provide default implementation that derives from
ModifyKindSetTrait and UpdateKindTrait
--
This message was sent by Atlassian Jira
(v8.20.10#820010)