raminqaf opened a new pull request, #28403:
URL: https://github.com/apache/flink/pull/28403
### What is the purpose of the change
A foreground query (a bare `SELECT` collected through the anonymous result
sink, which has no primary key) over a relation that emits an upsert changelog
fails with an unhelpful error: Can't generate a valid execution plan for the
given query followed by the optimized plan. A common trigger is
`FROM_CHANGELOG` with `PARTITION BY` and an `op_mapping` without
`UPDATE_BEFORE`. This change appends actionable guidance to that error.
Behavior is unchanged; only the message is improved.
### Brief change log
- `FlinkChangelogModeInferenceProgram`: when the rejected plan involves
updates, append guidance explaining the upsert-vs-retract mismatch and the two
fixes (declare a `PRIMARY KEY` on the sink, or make the upstream emit
`UPDATE_BEFORE`). Gated on `ModifyKind.UPDATE` so unrelated planning failures
keep the original message.
### Verifiying the output
Locally setup a test and verified the exception message. The plan now
includes the changelog mode of the operators. Before, it was all set to `NONE`.
```
Can't generate a valid execution plan for the given query:
This usually means an upsert changelog (UPDATE_AFTER without UPDATE_BEFORE)
reaches an operator or sink that requires a retract changelog (UPDATE_BEFORE
and UPDATE_AFTER), for example a sink without a primary key. To resolve it,
declare a PRIMARY KEY on the sink, or make the input produce UPDATE_BEFORE.
Sink(table=[*anonymous_collect$1*], fields=[customer_id, name],
changelogMode=[NONE])
+- ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0) PARTITION
BY($0), DESCRIPTOR(_UTF-16LE'operation'), MAP(_UTF-16LE'Created',
_UTF-16LE'INSERT':VARCHAR(12) CHARACTER SET "UTF-16LE", _UTF-16LE'Updated',
_UTF-16LE'UPDATE_AFTER':VARCHAR(12) CHARACTER SET "UTF-16LE",
_UTF-16LE'Deleted', _UTF-16LE'DELETE':VARCHAR(12) CHARACTER SET "UTF-16LE"),
DEFAULT(), DEFAULT(), DEFAULT())], uid=[FROM_CHANGELOG],
select=[customer_id,name], rowType=[RecordType(VARCHAR(2147483647) customer_id,
VARCHAR(2147483647) name)], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[customer_id]], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database,
customer_events]], fields=[customer_id, name, operation], changelogMode=[I])
```
### Does this pull request potentially affect one of the following parts:
- Dependencies: no
- `@Public(Evolving)` API: no
- Serializers: no
- Runtime per-record code paths: no
- Deployment / recovery: no
- S3 connector: no
### Documentation
- New feature: no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]