godfreyhe commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411358281
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,25 +37,29 @@
*
* @param currentRow latest row received by deduplicate function
* @param generateUpdateBefore whether need to send UPDATE_BEFORE
message for updates
- * @param state state of function
+ * @param state state of function, null if generateUpdateBefore is false
* @param out underlying collector
*/
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
- ValueState<BaseRow> state,
+ @Nullable ValueState<BaseRow> state,
Collector<BaseRow> out) throws Exception {
- // Check message should be accumulate
-
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+ // check message should be insert only.
+ Preconditions.checkArgument(currentRow.getRowKind() ==
RowKind.INSERT);
if (generateUpdateBefore) {
- // state stores complete row if generateUpdateBefore is
true
+ // state is not null when generateUpdateBefore is
enabled,
+ // the state stores complete row
BaseRow preRow = state.value();
state.update(currentRow);
if (preRow != null) {
- preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+ preRow.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(preRow);
}
}
+ // in order for better performance, we don't have state for
LastRow
+ // if not generate UPDATE_BEFORE, thus, we can't produce INSERT
messages for first row.
Review comment:
does all database support this ? I know mysql supports "INSERT ON
DUPLICATE KEY UPDATE"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]