wuchong commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r515793467
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
+ ValueState<RowData> state,
+ RowData currentRow,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+
+ checkInsertOnly(currentRow);
+ RowData preRow = state.value();
+ if (!orderFunctionProvider.matches(preRow, currentRow)) {
+ return;
+ }
+
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ null
+ );
+ state.update(currentRow);
+ }
+
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param bufferedRows latest row received by deduplicate
function
+ * @param serializer serializer to serialize the data
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void miniBatchDeduplicateOnRowTime(
Review comment:
It is only used in `RowTimeMiniBatchDeduplicateFunction`. We don't need
to put it in the util class.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
+ ValueState<RowData> state,
+ RowData currentRow,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+
+ checkInsertOnly(currentRow);
+ RowData preRow = state.value();
+ if (!orderFunctionProvider.matches(preRow, currentRow)) {
+ return;
+ }
+
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ null
+ );
+ state.update(currentRow);
+ }
+
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param bufferedRows latest row received by deduplicate
function
+ * @param serializer serializer to serialize the data
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void miniBatchDeduplicateOnRowTime(
+ ValueState<RowData> state,
+ List<RowData> bufferedRows,
+ TypeSerializer<RowData> serializer,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+ if (bufferedRows == null) {
Review comment:
I think `bufferedRows` would never be null, but maybe empty.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T> The type of the value in the state.
+ * @param <K> The type of the key in the bundle map.
+ * @param <V> The type of the value in the bundle map.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends
MapBundleFunction<K, V, IN, OUT> {
+
+ protected final TypeInformation<T> typeInfo;
+ protected final TypeSerializer<OUT> serializer;
+ protected final long minRetentionTime;
+ // state stores previous message under the key.
+ protected ValueState<T> state;
+
+ public MiniBatchDeduplicateFunctionBase(
+ TypeInformation<T> typeInfo,
+ TypeSerializer<OUT> serializer,
Review comment:
It would be better to remove this `serializer`, as it is never used in
the base class. It confuses what's this used for, and it's error-prone, becuase
it's nullable.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+
+/**
+ * Tests for {@link RowTimeDeduplicateFunction}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeDeduplicateFunctionTest extends
RowTimeDeduplicateFunctionTestBase {
+
+ private final boolean generateUpdateBefore;
+ private final boolean generateInsert;
+
+ public RowTimeDeduplicateFunctionTest(boolean generateUpdateBefore,
boolean generateInsert) {
+ this.generateUpdateBefore = generateUpdateBefore;
+ this.generateInsert = generateInsert;
+ }
+
+ @Test
+ public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
+ final boolean keepLastRow = false;
+ RowTimeDeduplicateFunction func = new
RowTimeDeduplicateFunction(
+ inputRowType,
+ minTtlTime.toMilliseconds(),
+ rowTimeIndex,
+ generateUpdateBefore,
+ generateInsert,
+ keepLastRow);
+ KeyedProcessOperator<RowData, RowData, RowData> operator = new
KeyedProcessOperator<>(func);
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness
= createTestHarness(operator);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 13, 99L));
+ testHarness.processElement(insertRecord("key1", 13, 99L));
+ testHarness.processElement(insertRecord("key1", 12, 100L));
+ testHarness.processElement(insertRecord("key2", 11, 101L));
+
+ // test 1: keep first row with row time
+ testHarness.processWatermark(new Watermark(102));
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+
+ testHarness = createTestHarness(operator);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 12, 300L));
+ testHarness.processElement(insertRecord("key2", 11, 301L));
+ testHarness.processElement(insertRecord("key3", 5, 299L));
+
+ // test 2: load snapshot state
+ testHarness.processWatermark(new Watermark(302));
+
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+ // test 3: expire the state
+
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+ testHarness.processElement(insertRecord("key1", 12, 400L));
+ testHarness.processElement(insertRecord("key2", 11, 401L));
+ testHarness.processWatermark(402);
+
+ // ("key1", 13, 99L) and ("key2", 11, 101L) had retired, thus
output ("key1", 12, 200L),("key2", 11, 201L)
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 3), testHarness.getOutput());
+ testHarness.close();
+ }
+
+ public void testRowTimeDeduplicateKeepLastRow() throws Exception {
Review comment:
Miss `@Test`?
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+
+/**
+ * Tests for {@link RowTimeDeduplicateFunction}.
+ */
+@RunWith(Parameterized.class)
+public class RowTimeDeduplicateFunctionTest extends
RowTimeDeduplicateFunctionTestBase {
+
+ private final boolean generateUpdateBefore;
+ private final boolean generateInsert;
+
+ public RowTimeDeduplicateFunctionTest(boolean generateUpdateBefore,
boolean generateInsert) {
+ this.generateUpdateBefore = generateUpdateBefore;
+ this.generateInsert = generateInsert;
+ }
+
+ @Test
+ public void testRowTimeDeduplicateKeepFirstRow() throws Exception {
+ final boolean keepLastRow = false;
+ RowTimeDeduplicateFunction func = new
RowTimeDeduplicateFunction(
+ inputRowType,
+ minTtlTime.toMilliseconds(),
+ rowTimeIndex,
+ generateUpdateBefore,
+ generateInsert,
+ keepLastRow);
+ KeyedProcessOperator<RowData, RowData, RowData> operator = new
KeyedProcessOperator<>(func);
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness
= createTestHarness(operator);
+
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 13, 99L));
+ testHarness.processElement(insertRecord("key1", 13, 99L));
+ testHarness.processElement(insertRecord("key1", 12, 100L));
+ testHarness.processElement(insertRecord("key2", 11, 101L));
+
+ // test 1: keep first row with row time
+ testHarness.processWatermark(new Watermark(102));
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+
+ testHarness = createTestHarness(operator);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 12, 300L));
+ testHarness.processElement(insertRecord("key2", 11, 301L));
+ testHarness.processElement(insertRecord("key3", 5, 299L));
+
+ // test 2: load snapshot state
+ testHarness.processWatermark(new Watermark(302));
+
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+ // test 3: expire the state
+
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+ testHarness.processElement(insertRecord("key1", 12, 400L));
+ testHarness.processElement(insertRecord("key2", 11, 401L));
+ testHarness.processWatermark(402);
+
+ // ("key1", 13, 99L) and ("key2", 11, 101L) had retired, thus
output ("key1", 12, 200L),("key2", 11, 201L)
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 3), testHarness.getOutput());
+ testHarness.close();
+ }
+
+ public void testRowTimeDeduplicateKeepLastRow() throws Exception {
+ final boolean keepLastRow = true;
+ RowTimeDeduplicateFunction func = new
RowTimeDeduplicateFunction(
+ inputRowType,
+ minTtlTime.toMilliseconds(),
+ rowTimeIndex,
+ generateUpdateBefore,
+ generateInsert,
+ true);
+ KeyedProcessOperator<RowData, RowData, RowData> operator = new
KeyedProcessOperator<>(func);
+ OneInputStreamOperatorTestHarness<RowData, RowData> testHarness
= createTestHarness(operator);
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 13, 99L));
+ testHarness.processElement(insertRecord("key1", 12, 100L));
+ testHarness.processElement(insertRecord("key2", 11, 101L));
+
+ // test 1: keep last row with row time
+ testHarness.processWatermark(new Watermark(102));
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 1), testHarness.getOutput());
+
+ // do a snapshot, close and restore again
+ OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+ testHarness.close();
+
+ testHarness = createTestHarness(operator);
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.processElement(insertRecord("key1", 12, 300L));
+ testHarness.processElement(insertRecord("key2", 11, 301L));
+ testHarness.processElement(insertRecord("key3", 5, 299L));
+
+ // test 2: load snapshot state
+ testHarness.processWatermark(new Watermark(302));
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 2), testHarness.getOutput());
+
+ // test 3: expire the state
+
testHarness.setStateTtlProcessingTime(minTtlTime.toMilliseconds() + 1);
+ testHarness.processElement(insertRecord("key1", 12, 400L));
+ testHarness.processElement(insertRecord("key2", 11, 401L));
+ testHarness.processWatermark(402);
+
+ // all state has expired, so the record ("key1", 12, 400L),
("key2", 12, 401L) will be INSERT message
+ assertor.assertOutputEqualsSorted("output wrong.",
getExpectOutput(keepLastRow, 3), testHarness.getOutput());
Review comment:
It's super hard to understand what the expect result is.
I think if the result are different, then we shouldn't parameterize them
into one test. We can follow the way of `TemporalRowTimeJoinOperatorTest` that
resue the same test data in a common private method, but delcare differenet
expected output in different test methods.
Besides, I think we can't share the expected output for both minibatch and
non-minibatch mode?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
+ ValueState<RowData> state,
+ RowData currentRow,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+
+ checkInsertOnly(currentRow);
+ RowData preRow = state.value();
+ if (!orderFunctionProvider.matches(preRow, currentRow)) {
+ return;
+ }
+
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ null
+ );
+ state.update(currentRow);
+ }
+
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param bufferedRows latest row received by deduplicate
function
+ * @param serializer serializer to serialize the data
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void miniBatchDeduplicateOnRowTime(
+ ValueState<RowData> state,
+ List<RowData> bufferedRows,
+ TypeSerializer<RowData> serializer,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+ if (bufferedRows == null) {
+ return;
+ }
+
+ RowData preRow = state.value();
+ for (RowData currentRow : bufferedRows) {
+ checkInsertOnly(currentRow);
+ if (!orderFunctionProvider.matches(preRow, currentRow))
{
+ continue;
+ }
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ serializer
+ );
+ preRow = currentRow;
+ }
+ state.update(preRow);
+ }
+
+ private static void collectRetractResult(
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ RowData preRow,
+ RowData currentRow,
+ Collector<RowData> out,
+ TypeSerializer<RowData> serializer) {
+ if (generateUpdateBefore || generateInsert) {
+ if (preRow == null) {
+ // the first row, send INSERT message
+ currentRow.setRowKind(RowKind.INSERT);
+ out.collect(currentRow);
+ } else {
+ if (generateUpdateBefore) {
+ RowData copyRow;
+ // when miniBatch enabled, do a copy
here, the serializer is not null
+ if (serializer != null) {
+ copyRow =
serializer.copy(preRow);
Review comment:
If this copying can be removed, the implementations of
`collectRetractResult` and `processLastRowOnProcTime` are totally the same. I
feel that `collectRetractResult` is not suitable, because it not always
generates retractions.
What about combine them in the the following new method?
```java
static void updateDeduplicateResult(
RowData currentRow, // new row result of deduplicate
under current key
ValueState<RowData> state, // the state stores the
previous collected row under current key
boolean generateUpdateBefore,
boolean generateInsert,
Collector<RowData> out) throws Exception {
```
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T> The type of the value in the state.
+ * @param <K> The type of the key in the bundle map.
+ * @param <V> The type of the value in the bundle map.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends
MapBundleFunction<K, V, IN, OUT> {
Review comment:
Add `serialVersionUID`.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.runtime.context.ExecutionContext;
+import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
+
+import static
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+
+/**
+ * Base class for miniBatch deduplicate function.
+ * @param <T> The type of the value in the state.
+ * @param <K> The type of the key in the bundle map.
+ * @param <V> The type of the value in the bundle map.
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+abstract class MiniBatchDeduplicateFunctionBase<T, K, V, IN, OUT> extends
MapBundleFunction<K, V, IN, OUT> {
+
+ protected final TypeInformation<T> typeInfo;
+ protected final TypeSerializer<OUT> serializer;
+ protected final long minRetentionTime;
+ // state stores previous message under the key.
+ protected ValueState<T> state;
+
+ public MiniBatchDeduplicateFunctionBase(
+ TypeInformation<T> typeInfo,
Review comment:
`typeInfo` -> `stateType` to be more specifically.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
Review comment:
ditto.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
+ ValueState<RowData> state,
+ RowData currentRow,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+
+ checkInsertOnly(currentRow);
+ RowData preRow = state.value();
+ if (!orderFunctionProvider.matches(preRow, currentRow)) {
+ return;
+ }
+
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ null
+ );
+ state.update(currentRow);
+ }
+
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param bufferedRows latest row received by deduplicate
function
+ * @param serializer serializer to serialize the data
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void miniBatchDeduplicateOnRowTime(
+ ValueState<RowData> state,
+ List<RowData> bufferedRows,
+ TypeSerializer<RowData> serializer,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+ if (bufferedRows == null) {
+ return;
+ }
+
+ RowData preRow = state.value();
+ for (RowData currentRow : bufferedRows) {
+ checkInsertOnly(currentRow);
+ if (!orderFunctionProvider.matches(preRow, currentRow))
{
+ continue;
+ }
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ serializer
+ );
+ preRow = currentRow;
+ }
+ state.update(preRow);
+ }
+
+ private static void collectRetractResult(
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ RowData preRow,
+ RowData currentRow,
+ Collector<RowData> out,
+ TypeSerializer<RowData> serializer) {
+ if (generateUpdateBefore || generateInsert) {
+ if (preRow == null) {
+ // the first row, send INSERT message
+ currentRow.setRowKind(RowKind.INSERT);
+ out.collect(currentRow);
+ } else {
+ if (generateUpdateBefore) {
+ RowData copyRow;
+ // when miniBatch enabled, do a copy
here, the serializer is not null
+ if (serializer != null) {
+ copyRow =
serializer.copy(preRow);
Review comment:
Why we have to do a deep copy here?
If it is for not affecting the original `preRow`, we can set back `RowKind`
after collecting `preRow` (non-minibatch mode also has this problem).
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +150,148 @@ static void processFirstRow(
out.collect(currentRow);
}
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param currentRow latest row received by deduplicate
function
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void deduplicateOnRowTime(
+ ValueState<RowData> state,
+ RowData currentRow,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
+
+ checkInsertOnly(currentRow);
+ RowData preRow = state.value();
+ if (!orderFunctionProvider.matches(preRow, currentRow)) {
+ return;
+ }
+
+ collectRetractResult(
+ generateUpdateBefore,
+ generateInsert,
+ preRow,
+ currentRow,
+ out,
+ null
+ );
+ state.update(currentRow);
+ }
+
+ /**
+ * Processes element to deduplicate on keys with row time semantic,
sends current element if it is last
+ * or first row, retracts previous element if needed.
+ *
+ * @param state state of function
+ * @param bufferedRows latest row received by deduplicate
function
+ * @param serializer serializer to serialize the data
+ * @param out underlying collector
+ * @param generateUpdateBefore flag to generate UPDATE_BEFORE message
or not
+ * @param generateInsert flag to gennerate INSERT message or not
+ * @param orderFunctionProvider provider that provides an order
function to judge first or last
+ */
+ public static void miniBatchDeduplicateOnRowTime(
+ ValueState<RowData> state,
+ List<RowData> bufferedRows,
+ TypeSerializer<RowData> serializer,
+ Collector<RowData> out,
+ boolean generateUpdateBefore,
+ boolean generateInsert,
+ DeduplicateOrderFunctionProvider orderFunctionProvider)
throws Exception {
Review comment:
If this method can be moved in `RowTimeMiniBatchDeduplicateFunction`, I
think we don't need the special `DeduplicateOrderFunctionProvider` parameter
then, because we have `keepLastRow` flag.
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
##########
@@ -137,4 +152,93 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode:
StateBackendMode)
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+ @Test
+ def testFirstRowOnRowtime(): Unit = {
+ val t = env.fromCollection(rowtimeTestData)
+ .assignTimestampsAndWatermarks(new RowtimeExtractor)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime())
+ tEnv.registerTable("T", t)
+ createSinkTable("rowtime_sink")
+
+ val sql =
+ """
+ |INSERT INTO rowtime_sink
+ | SELECT a, b, c, rowtime
+ | FROM (
+ | SELECT *,
+ | ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime) as rowNum
+ | FROM T
+ | )
+ | WHERE rowNum = 1
+ """.stripMargin
+
+ tEnv.executeSql(sql).await()
+ val rawResult = TestValuesTableFactory.getRawResults("rowtime_sink")
+
+ val expected = List(
+ "+I(1,1,Hi,1970-01-01T00:00:00.001)",
+ "+I(2,3,I am fine.,1970-01-01T00:00:00.003)",
+ "+I(3,5,Comment#2,1970-01-01T00:00:00.005)",
+ "+I(4,4,Comment#3,1970-01-01T00:00:00.004)",
+ "+U(3,4,Comment#2,1970-01-01T00:00:00.004)",
+ "-U(3,5,Comment#2,1970-01-01T00:00:00.005)")
+ assertEquals(expected.sorted, rawResult.sorted)
Review comment:
If we want to verify the changelog result, I would suggest to use single
parallelism and do not sort the result. Otherwise, we didn't actually test the
changelog order. For example, it's hard to understand why `-U` is after `+U` in
the above test.
We can also just verify the merged result and can use the default
parallelism.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]