godfreyhe commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411358281



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,25 +37,29 @@
         *
         * @param currentRow latest row received by deduplicate function
         * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-        * @param state state of function
+        * @param state state of function, null if generateUpdateBefore is false
         * @param out underlying collector
         */
        static void processLastRow(
                        BaseRow currentRow,
                        boolean generateUpdateBefore,
-                       ValueState<BaseRow> state,
+                       @Nullable ValueState<BaseRow> state,
                        Collector<BaseRow> out) throws Exception {
-               // Check message should be accumulate
-               
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+               // check message should be insert only.
+               Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
                if (generateUpdateBefore) {
-                       // state stores complete row if generateUpdateBefore is 
true
+                       // state is not null when generateUpdateBefore is 
enabled,
+                       // the state stores complete row
                        BaseRow preRow = state.value();
                        state.update(currentRow);
                        if (preRow != null) {
-                               preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+                               preRow.setRowKind(RowKind.UPDATE_BEFORE);
                                out.collect(preRow);
                        }
                }
+               // in order for better performance, we don't have state for 
LastRow
+               // if not generate UPDATE_BEFORE, thus, we can't produce INSERT 
messages for first row.

Review comment:
       does all database support this ? I know mysql supports "INSERT ON 
DUPLICATE KEY UPDATE"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to