pnowojski commented on code in PR #26051:
URL: https://github.com/apache/flink/pull/26051#discussion_r1946598188
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java:
##########
@@ -339,25 +362,39 @@ OneInputStreamOperator<RowData, RowData>
createDeduplicateOperator() {
}
} else {
if (isAsyncStateEnabled()) {
- AsyncStateRowTimeDeduplicateFunction processFunction =
- new AsyncStateRowTimeDeduplicateFunction(
- rowTypeInfo,
- stateRetentionTime,
- rowtimeIndex,
- generateUpdateBefore,
- generateInsert(),
- keepLastRow);
- return new AsyncKeyedProcessOperator<>(processFunction);
+ if (!keepLastRow && outputInsertOnly) {
+ checkState(canBeInsertOnly(config, keepLastRow));
Review Comment:
Generating plans and actually using those plans can happen quite far apart.
Yes it doesn't make much sense to `checkState` this if we assume both
generation and creating job graph happens with the same Flink version, but
that's often not the case. With those `checkState` I wanted to add a bit of
extra safety if in the future someones changes the code. Similarly how for
example the dedupe operator itself makes assertions that the input is insert
only.
Also they don't hurt.
But if you have stronger feelings, about dropping them I would be fine as
well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]