wuchong commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r515793467



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData preRow = state.value();
+               if (!orderFunctionProvider.matches(preRow, currentRow)) {
+                       return;
+               }
+
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               preRow,
+                               currentRow,
+                               out,
+                               null
+               );
+               state.update(currentRow);
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param bufferedRows          latest row received by deduplicate 
function
+        * @param serializer            serializer to serialize the data
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void miniBatchDeduplicateOnRowTime(

Review comment:
       It is only used in `RowTimeMiniBatchDeduplicateFunction`. We don't need 
to put it in the util class.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData preRow = state.value();
+               if (!orderFunctionProvider.matches(preRow, currentRow)) {
+                       return;
+               }
+
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               preRow,
+                               currentRow,
+                               out,
+                               null
+               );
+               state.update(currentRow);
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param bufferedRows          latest row received by deduplicate 
function
+        * @param serializer            serializer to serialize the data
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void miniBatchDeduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       List<RowData> bufferedRows,
+                       TypeSerializer<RowData> serializer,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+               if (bufferedRows == null) {

Review comment:
       I think `bufferedRows` would never be null, but maybe empty.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T>   The type of the value in the state.
+ * @param <K>   The type of the key in the bundle map.
+ * @param <V>   The type of the value in the bundle map.
+ * @param <IN>  Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends 
MapBundleFunction<K, V, IN, OUT> {
+
+       protected final TypeInformation<T> typeInfo;
+       protected final TypeSerializer<OUT> serializer;
+       protected final long minRetentionTime;
+       // state stores previous message under the key.
+       protected ValueState<T> state;
+
+       public MiniBatchDeduplicateFunctionBase(
+                       TypeInformation<T> typeInfo,
+                       TypeSerializer<OUT> serializer,

Review comment:
       It would be better to remove this `serializer`, as it is never used in 
the base class. It confuses what's this used for, and it's error-prone, becuase 
it's nullable. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+
+/**
+ * Tests for {@link RowTimeDeduplicateFunction}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase {
+
+       private final boolean generateUpdateBefore;
+       private final boolean generateInsert;
+
+       public RowTimeDeduplicateFunctionTest(boolean generateUpdateBefore, 
boolean generateInsert) {
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.generateInsert = generateInsert;
+       }
+
+       @Test
+       public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
+               final boolean keepLastRow = false;
+               RowTimeDeduplicateFunction func = new 
RowTimeDeduplicateFunction(
+                               inputRowType,
+                               minTtlTime.toMilliseconds(),
+                               rowTimeIndex,
+                               generateUpdateBefore,
+                               generateInsert,
+                               keepLastRow);
+               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(func);
+               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(operator);
+
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 13, 99L));
+               testHarness.processElement(insertRecord("key1", 13, 99L));
+               testHarness.processElement(insertRecord("key1", 12, 100L));
+               testHarness.processElement(insertRecord("key2", 11, 101L));
+
+               // test 1: keep first row with row time
+               testHarness.processWatermark(new Watermark(102));
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+               // do a snapshot, close and restore again
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+               testHarness.close();
+
+               testHarness = createTestHarness(operator);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 12, 300L));
+               testHarness.processElement(insertRecord("key2", 11, 301L));
+               testHarness.processElement(insertRecord("key3", 5, 299L));
+
+               // test 2: load snapshot state
+               testHarness.processWatermark(new Watermark(302));
+
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+               // test 3: expire the state
+               
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+               testHarness.processElement(insertRecord("key1", 12, 400L));
+               testHarness.processElement(insertRecord("key2", 11, 401L));
+               testHarness.processWatermark(402);
+
+               // ("key1", 13, 99L) and ("key2", 11, 101L) had retired, thus 
output ("key1", 12, 200L),("key2", 11, 201L)
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 3), testHarness.getOutput());
+               testHarness.close();
+       }
+
+       public void testRowTimeDeduplicateKeepLastRow() throws Exception {

Review comment:
       Miss `@Test`?
   

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+
+/**
+ * Tests for {@link RowTimeDeduplicateFunction}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase {
+
+       private final boolean generateUpdateBefore;
+       private final boolean generateInsert;
+
+       public RowTimeDeduplicateFunctionTest(boolean generateUpdateBefore, 
boolean generateInsert) {
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.generateInsert = generateInsert;
+       }
+
+       @Test
+       public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
+               final boolean keepLastRow = false;
+               RowTimeDeduplicateFunction func = new 
RowTimeDeduplicateFunction(
+                               inputRowType,
+                               minTtlTime.toMilliseconds(),
+                               rowTimeIndex,
+                               generateUpdateBefore,
+                               generateInsert,
+                               keepLastRow);
+               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(func);
+               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(operator);
+
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 13, 99L));
+               testHarness.processElement(insertRecord("key1", 13, 99L));
+               testHarness.processElement(insertRecord("key1", 12, 100L));
+               testHarness.processElement(insertRecord("key2", 11, 101L));
+
+               // test 1: keep first row with row time
+               testHarness.processWatermark(new Watermark(102));
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+               // do a snapshot, close and restore again
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+               testHarness.close();
+
+               testHarness = createTestHarness(operator);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 12, 300L));
+               testHarness.processElement(insertRecord("key2", 11, 301L));
+               testHarness.processElement(insertRecord("key3", 5, 299L));
+
+               // test 2: load snapshot state
+               testHarness.processWatermark(new Watermark(302));
+
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+               // test 3: expire the state
+               
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+               testHarness.processElement(insertRecord("key1", 12, 400L));
+               testHarness.processElement(insertRecord("key2", 11, 401L));
+               testHarness.processWatermark(402);
+
+               // ("key1", 13, 99L) and ("key2", 11, 101L) had retired, thus 
output ("key1", 12, 200L),("key2", 11, 201L)
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 3), testHarness.getOutput());
+               testHarness.close();
+       }
+
+       public void testRowTimeDeduplicateKeepLastRow() throws Exception {
+               final boolean keepLastRow = true;
+               RowTimeDeduplicateFunction func = new 
RowTimeDeduplicateFunction(
+                               inputRowType,
+                               minTtlTime.toMilliseconds(),
+                               rowTimeIndex,
+                               generateUpdateBefore,
+                               generateInsert,
+                               true);
+               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(func);
+               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(operator);
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 13, 99L));
+               testHarness.processElement(insertRecord("key1", 12, 100L));
+               testHarness.processElement(insertRecord("key2", 11, 101L));
+
+               // test 1: keep last row with row time
+               testHarness.processWatermark(new Watermark(102));
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+               // do a snapshot, close and restore again
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+               testHarness.close();
+
+               testHarness = createTestHarness(operator);
+               testHarness.setup();
+               testHarness.initializeState(snapshot);
+               testHarness.open();
+
+               testHarness.processElement(insertRecord("key1", 12, 300L));
+               testHarness.processElement(insertRecord("key2", 11, 301L));
+               testHarness.processElement(insertRecord("key3", 5, 299L));
+
+               // test 2: load snapshot state
+               testHarness.processWatermark(new Watermark(302));
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+               // test 3: expire the state
+               
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+               testHarness.processElement(insertRecord("key1", 12, 400L));
+               testHarness.processElement(insertRecord("key2", 11, 401L));
+               testHarness.processWatermark(402);
+
+               // all state has expired, so the record ("key1", 12, 400L), 
("key2", 12, 401L) will be INSERT message
+               assertor.assertOutputEqualsSorted("output wrong.", 
getExpectOutput(keepLastRow, 3), testHarness.getOutput());

Review comment:
       It's super hard to understand what the expect result is. 
   
   I think if the result are different, then we shouldn't parameterize them 
into one test. We can follow the way of `TemporalRowTimeJoinOperatorTest` that 
resue the same test data in a common private method, but delcare differenet 
expected output in different test methods. 
   
   Besides, I think we can't share the expected output for both minibatch and 
non-minibatch mode?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData preRow = state.value();
+               if (!orderFunctionProvider.matches(preRow, currentRow)) {
+                       return;
+               }
+
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               preRow,
+                               currentRow,
+                               out,
+                               null
+               );
+               state.update(currentRow);
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param bufferedRows          latest row received by deduplicate 
function
+        * @param serializer            serializer to serialize the data
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void miniBatchDeduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       List<RowData> bufferedRows,
+                       TypeSerializer<RowData> serializer,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+               if (bufferedRows == null) {
+                       return;
+               }
+
+               RowData preRow = state.value();
+               for (RowData currentRow : bufferedRows) {
+                       checkInsertOnly(currentRow);
+                       if (!orderFunctionProvider.matches(preRow, currentRow)) 
{
+                               continue;
+                       }
+                       collectRetractResult(
+                                       generateUpdateBefore,
+                                       generateInsert,
+                                       preRow,
+                                       currentRow,
+                                       out,
+                                       serializer
+                       );
+                       preRow = currentRow;
+               }
+               state.update(preRow);
+       }
+
+       private static void collectRetractResult(
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       RowData preRow,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       TypeSerializer<RowData> serializer) {
+               if (generateUpdateBefore || generateInsert) {
+                       if (preRow == null) {
+                               // the first row, send INSERT message
+                               currentRow.setRowKind(RowKind.INSERT);
+                               out.collect(currentRow);
+                       } else {
+                               if (generateUpdateBefore) {
+                                       RowData copyRow;
+                                       // when miniBatch enabled, do a copy 
here, the serializer is not null
+                                       if (serializer != null) {
+                                               copyRow = 
serializer.copy(preRow);

Review comment:
       If this copying can be removed, the implementations of 
`collectRetractResult` and `processLastRowOnProcTime` are totally the same. I 
feel that `collectRetractResult` is not suitable, because it not always 
generates retractions. 
   
   What about combine them in the the following new method?
   
   ```java
   static void updateDeduplicateResult(
                        RowData currentRow,  // new row result of deduplicate 
under current key
                        ValueState<RowData> state, // the state stores the 
previous collected row under current key
                        boolean generateUpdateBefore,           
                        boolean generateInsert,
                        Collector<RowData> out) throws Exception {
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T>   The type of the value in the state.
+ * @param <K>   The type of the key in the bundle map.
+ * @param <V>   The type of the value in the bundle map.
+ * @param <IN>  Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends 
MapBundleFunction<K, V, IN, OUT> {

Review comment:
       Add `serialVersionUID`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T>   The type of the value in the state.
+ * @param <K>   The type of the key in the bundle map.
+ * @param <V>   The type of the value in the bundle map.
+ * @param <IN>  Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends 
MapBundleFunction<K, V, IN, OUT> {
+
+       protected final TypeInformation<T> typeInfo;
+       protected final TypeSerializer<OUT> serializer;
+       protected final long minRetentionTime;
+       // state stores previous message under the key.
+       protected ValueState<T> state;
+
+       public MiniBatchDeduplicateFunctionBase(
+                       TypeInformation<T> typeInfo,

Review comment:
       `typeInfo` -> `stateType` to be more specifically. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(

Review comment:
       ditto.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData preRow = state.value();
+               if (!orderFunctionProvider.matches(preRow, currentRow)) {
+                       return;
+               }
+
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               preRow,
+                               currentRow,
+                               out,
+                               null
+               );
+               state.update(currentRow);
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param bufferedRows          latest row received by deduplicate 
function
+        * @param serializer            serializer to serialize the data
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void miniBatchDeduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       List<RowData> bufferedRows,
+                       TypeSerializer<RowData> serializer,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+               if (bufferedRows == null) {
+                       return;
+               }
+
+               RowData preRow = state.value();
+               for (RowData currentRow : bufferedRows) {
+                       checkInsertOnly(currentRow);
+                       if (!orderFunctionProvider.matches(preRow, currentRow)) 
{
+                               continue;
+                       }
+                       collectRetractResult(
+                                       generateUpdateBefore,
+                                       generateInsert,
+                                       preRow,
+                                       currentRow,
+                                       out,
+                                       serializer
+                       );
+                       preRow = currentRow;
+               }
+               state.update(preRow);
+       }
+
+       private static void collectRetractResult(
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       RowData preRow,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       TypeSerializer<RowData> serializer) {
+               if (generateUpdateBefore || generateInsert) {
+                       if (preRow == null) {
+                               // the first row, send INSERT message
+                               currentRow.setRowKind(RowKind.INSERT);
+                               out.collect(currentRow);
+                       } else {
+                               if (generateUpdateBefore) {
+                                       RowData copyRow;
+                                       // when miniBatch enabled, do a copy 
here, the serializer is not null
+                                       if (serializer != null) {
+                                               copyRow = 
serializer.copy(preRow);

Review comment:
       Why we have to do a deep copy here?
   If it is for not affecting the original `preRow`, we can set back `RowKind` 
after collecting `preRow` (non-minibatch mode also has this problem).

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param currentRow            latest row received by deduplicate 
function
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void deduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData preRow = state.value();
+               if (!orderFunctionProvider.matches(preRow, currentRow)) {
+                       return;
+               }
+
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               preRow,
+                               currentRow,
+                               out,
+                               null
+               );
+               state.update(currentRow);
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last
+        * or first row, retracts previous element if needed.
+        *
+        * @param state                 state of function
+        * @param bufferedRows          latest row received by deduplicate 
function
+        * @param serializer            serializer to serialize the data
+        * @param out                   underlying collector
+        * @param generateUpdateBefore  flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert        flag to gennerate INSERT message or not
+        * @param orderFunctionProvider provider that provides an order 
function to judge first or last
+        */
+       public static void miniBatchDeduplicateOnRowTime(
+                       ValueState<RowData> state,
+                       List<RowData> bufferedRows,
+                       TypeSerializer<RowData> serializer,
+                       Collector<RowData> out,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert,
+                       DeduplicateOrderFunctionProvider orderFunctionProvider) 
throws Exception {

Review comment:
       If this method can be moved in `RowTimeMiniBatchDeduplicateFunction`,  I 
think we don't need the special `DeduplicateOrderFunctionProvider` parameter 
then, because we have `keepLastRow` flag. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
##########
@@ -137,4 +152,93 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: 
StateBackendMode)
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testFirstRowOnRowtime(): Unit = {
+    val t = env.fromCollection(rowtimeTestData)
+      .assignTimestampsAndWatermarks(new RowtimeExtractor)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+    tEnv.registerTable("T", t)
+    createSinkTable("rowtime_sink")
+
+    val sql =
+      """
+        |INSERT INTO rowtime_sink
+        | SELECT a, b, c, rowtime
+        | FROM (
+        |   SELECT *,
+        |     ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
+        |   FROM T
+        | )
+        | WHERE rowNum = 1
+      """.stripMargin
+
+    tEnv.executeSql(sql).await()
+    val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+    val expected = List(
+      "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+      "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+      "+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
+      "+I(4,4,Comment#3,1970-01-01T00:00:00.004)",
+      "+U(3,4,Comment#2,1970-01-01T00:00:00.004)",
+      "-U(3,5,Comment#2,1970-01-01T00:00:00.005)")
+    assertEquals(expected.sorted, rawResult.sorted)

Review comment:
       If we want to verify the changelog result, I would suggest to use single 
parallelism and do not sort the result. Otherwise, we didn't actually test the 
changelog order. For example, it's hard to understand why `-U` is after `+U` in 
the above test.  
   
   We can also just verify the merged result and can use the default 
parallelism.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to