This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit df8de4ac45884bd3134abaf87baeb65c31060b3a Author: Jark Wu <[email protected]> AuthorDate: Thu Nov 19 16:50:42 2020 +0800 [hotfix][table-blink] Improve state cleanup harness tests for group aggregation --- .../stream/StreamExecGlobalGroupAggregate.scala | 2 +- .../physical/stream/StreamExecGroupAggregate.scala | 4 +- .../stream/StreamExecGroupTableAggregate.scala | 2 +- .../StreamExecIncrementalGroupAggregate.scala | 2 +- .../harness/GroupAggregateHarnessTest.scala | 127 ++++++++++++++- .../harness/TableAggregateHarnessTest.scala | 7 +- .../runtime/dataview/PerKeyStateDataViewStore.java | 3 + .../table/runtime/dataview/StateListView.java | 9 +- .../operators/aggregate/GroupAggFunction.java | 2 - .../aggregate/MiniBatchGroupAggFunction.java | 17 ++ .../deduplicate/DeduplicateFunctionBase.java | 10 +- .../ProcTimeDeduplicateKeepFirstRowFunction.java | 4 +- .../ProcTimeDeduplicateKeepLastRowFunction.java | 4 +- ...meMiniBatchDeduplicateKeepFirstRowFunction.java | 6 +- ...imeMiniBatchDeduplicateKeepLastRowFunction.java | 4 +- .../operators/aggregate/GroupAggFunctionTest.java | 98 ----------- .../aggregate/GroupAggFunctionTestBase.java | 180 --------------------- .../aggregate/MiniBatchGroupAggFunctionTest.java | 98 ----------- 18 files changed, 173 insertions(+), 406 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala index ee007ef..6020e72 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala @@ -163,7 +163,7 @@ class StreamExecGlobalGroupAggregate( globalAccTypes, indexOfCountStar, generateUpdateBefore, - tableConfig.getMinIdleStateRetentionTime) + tableConfig.getIdleStateRetention.toMillis) new KeyedMapBundleOperator( aggFunction, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala index 5d1d249..cece6e8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala @@ -158,7 +158,7 @@ class StreamExecGroupAggregate( inputRowType, inputCountIndex, generateUpdateBefore, - tableConfig.getMinIdleStateRetentionTime) + tableConfig.getIdleStateRetention.toMillis) new KeyedMapBundleOperator( aggFunction, @@ -170,7 +170,7 @@ class StreamExecGroupAggregate( accTypes, inputCountIndex, generateUpdateBefore, - tableConfig.getMinIdleStateRetentionTime) + tableConfig.getIdleStateRetention.toMillis) val operator = new KeyedProcessOperator[RowData, RowData, RowData](aggFunction) operator diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala index d808aa8..ef276fa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala @@ -142,7 +142,7 @@ class StreamExecGroupTableAggregate( accTypes, inputCountIndex, generateUpdateBefore, - tableConfig.getMinIdleStateRetentionTime) + tableConfig.getIdleStateRetention.toMillis) val operator = new KeyedProcessOperator[RowData, RowData, RowData](aggFunction) val selector = KeySelectorUtil.getRowDataSelector( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala index 2d00216..aa69619 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala @@ -167,7 +167,7 @@ class StreamExecIncrementalGroupAggregate( partialAggsHandler, finalAggsHandler, finalKeySelector, - config.getMinIdleStateRetentionTime) + config.getIdleStateRetention.toMillis) val operator = new KeyedMapBundleOperator( aggFunction, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala index 462d5a1..0b8ada6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala @@ -18,28 +18,35 @@ package org.apache.flink.table.planner.runtime.harness -import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ -import org.apache.flink.table.api._ +import org.apache.flink.table.api.{EnvironmentSettings, _} import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl -import org.apache.flink.table.api.EnvironmentSettings -import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, TABLE_EXEC_MINIBATCH_ENABLED, TABLE_EXEC_MINIBATCH_SIZE} +import org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY +import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode, MiniBatchOff, MiniBatchOn} +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} +import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.CountNullNonNull import org.apache.flink.table.runtime.util.RowDataHarnessAssertor import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord import org.apache.flink.types.Row import org.apache.flink.types.RowKind._ + import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Test} import java.lang.{Long => JLong} +import java.time.Duration import java.util.concurrent.ConcurrentLinkedQueue +import java.util.{Collection => JCollection} +import scala.collection.JavaConversions._ import scala.collection.mutable @RunWith(classOf[Parameterized]) -class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) { +class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode) + extends HarnessTestBase(mode) { @Before override def before(): Unit = { @@ -47,6 +54,19 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( val setting = EnvironmentSettings.newInstance().inStreamingMode().build() val config = new TestTableConfig this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + // set mini batch + val tableConfig = tEnv.getConfig + miniBatch match { + case MiniBatchOn => + tableConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, true) + tableConfig.getConfiguration.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)) + // trigger every record for easier harness test + tableConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 1L) + // disable local-global to only test the MiniBatchGroupAggFunction + tableConfig.getConfiguration.setString(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE") + case MiniBatchOff => + tableConfig.getConfiguration.removeConfig(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY) + } } @Test @@ -65,7 +85,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate") val assertor = new RowDataHarnessAssertor( Array( @@ -138,4 +158,99 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( testHarness.close() } + @Test + def testAggregationWithDistinct(): Unit = { + val data = new mutable.MutableList[(String, String, Long)] + val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("T", t) + tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull) + + val sql = + """ + |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), SUM(c) + |FROM T + |GROUP BY a + """.stripMargin + val t1 = tEnv.sqlQuery(sql) + + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) + val testHarness = createHarnessTester(t1.toRetractStream[Row], "GroupAggregate") + val assertor = new RowDataHarnessAssertor( + Array( + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType)) + + testHarness.open() + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + + // set ttl processing time to 1 + testHarness.setStateTtlProcessingTime(1) + + // insertion + testHarness.processElement(binaryRecord(INSERT,"aaa", "a1", 1L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong, "1|0", 1L: JLong, 1L: JLong)) + + // insertion + testHarness.processElement(binaryRecord(INSERT, "bbb", "b1", 2L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: JLong, 2L: JLong)) + + // advance ttl processing time + testHarness.setStateTtlProcessingTime(1000) + + // update for insertion + testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 1L: JLong, "1|0", 1L: JLong, 1L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|0", 2L: JLong, 3L: JLong)) + + // this should expire "bbb" state + testHarness.setStateTtlProcessingTime(2001) + + // accumulate from initial state + testHarness.processElement(binaryRecord(INSERT, "bbb", "b3", 3L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: JLong, 3L: JLong)) + // "aaa" is not expired + testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 2L: JLong, "2|0", 2L: JLong, 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|0", 3L: JLong, 6L: JLong)) + // test null key + testHarness.processElement(binaryRecord(INSERT, "aaa", null, 4L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 2L: JLong, "2|0", 3L: JLong, 6L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|1", 4L: JLong, 10L: JLong)) + + // this should expire "aaa" state + testHarness.setStateTtlProcessingTime(5001) + + // accumulate from initial state + testHarness.processElement(binaryRecord(INSERT, "aaa", null, 4L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "aaa", 0L: JLong, "0|1", 1L: JLong, 4L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 0L: JLong, "0|1", 1L: JLong, 4L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 1L: JLong, "1|1", 2L: JLong, 6L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "bbb", "b4", 4L: JLong)) + expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: JLong, 4L: JLong)) + + val result = testHarness.getOutput + + assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) + + testHarness.close() + } + +} + +object GroupAggregateHarnessTest { + + @Parameterized.Parameters(name = "StateBackend={0}, MiniBatch={1}") + def parameters(): JCollection[Array[java.lang.Object]] = { + Seq[Array[AnyRef]]( + Array(HEAP_BACKEND, MiniBatchOff), + Array(HEAP_BACKEND, MiniBatchOn), + Array(ROCKSDB_BACKEND, MiniBatchOff), + Array(ROCKSDB_BACKEND, MiniBatchOn) + ) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala index b6add51..32b2688 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.harness import java.lang.{Integer => JInt} import java.util.concurrent.ConcurrentLinkedQueue -import org.apache.flink.api.common.time.Time import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ @@ -35,6 +34,8 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Test} +import java.time.Duration + import scala.collection.mutable @RunWith(classOf[Parameterized]) @@ -60,7 +61,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( .flatAggregate(top3('b) as ('b1, 'b2)) .select('a, 'b1, 'b2) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) val testHarness = createHarnessTester( resultTable.toRetractStream[Row], "GroupTableAggregate") val assertor = new RowDataHarnessAssertor( @@ -125,7 +126,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( .flatAggregate(top3('b) as ('b1, 'b2)) .select('b1, 'b2) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) val testHarness = createHarnessTester( resultTable.toRetractStream[Row], "GroupTableAggregate") val assertor = new RowDataHarnessAssertor( diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java index 5b592b4..93b0462 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java @@ -70,6 +70,9 @@ public final class PerKeyStateDataViewStore implements StateDataViewStore { final ValueStateDescriptor<EV> nullStateDescriptor = new ValueStateDescriptor<>( stateName + NULL_STATE_POSTFIX, valueSerializer); + if (stateTtlConfig.isEnabled()) { + nullStateDescriptor.enableTimeToLive(stateTtlConfig); + } final ValueState<EV> nullState = ctx.getState(nullStateDescriptor); return new StateMapView.KeyedStateMapViewWithKeysNullable<>(mapState, nullState); } else { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java index 782fa06..1970546 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java @@ -77,7 +77,14 @@ public abstract class StateListView<N, EE> extends ListView<EE> implements State @Override public boolean remove(EE value) throws Exception { - Iterator<EE> iterator = getListState().get().iterator(); + Iterable<EE> iterable = getListState().get(); + if (iterable == null) { + // ListState.get() may return null according to the Javadoc. + return false; + } + // the getListState().get() not always returns List object + // copy values to ArrayList for removing + Iterator<EE> iterator = iterable.iterator(); List<EE> list = new ArrayList<>(); while (iterator.hasNext()) { EE it = iterator.next(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java index 247ae47..9f35472 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java @@ -23,10 +23,8 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.table.data.JoinedRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore; import org.apache.flink.table.runtime.generated.AggsHandleFunction; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java index 79bdd4a..5a6953c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java @@ -41,10 +41,12 @@ import org.apache.flink.util.Collector; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; +import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg; import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; /** @@ -185,6 +187,21 @@ public class MiniBatchGroupAggFunction extends MapBundleFunction<RowData, List<R ctx.setCurrentKey(currentKey); RowData acc = accState.value(); if (acc == null) { + // Don't create a new accumulator for a retraction message. This + // might happen if the retraction message is the first message for the + // key or after a state clean up. + Iterator<RowData> inputIter = inputRows.iterator(); + while (inputIter.hasNext()) { + RowData current = inputIter.next(); + if (isRetractMsg(current)) { + inputIter.remove(); // remove all the beginning retraction messages + } else { + break; + } + } + if (inputRows.isEmpty()) { + return; + } acc = function.createAccumulators(); firstRow = true; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java index 1138c18..5d06c81 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java @@ -37,9 +37,11 @@ import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlCo */ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFunction<K, IN, OUT> { + private static final long serialVersionUID = 1L; + // the TypeInformation of the values in the state. protected final TypeInformation<T> typeInfo; - protected final long minRetentionTime; + protected final long stateRetentionTime; protected final TypeSerializer<OUT> serializer; // state stores previous message under the key. protected ValueState<T> state; @@ -47,9 +49,9 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFuncti public DeduplicateFunctionBase( TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, - long minRetentionTime) { + long stateRetentionTime) { this.typeInfo = typeInfo; - this.minRetentionTime = minRetentionTime; + this.stateRetentionTime = stateRetentionTime; this.serializer = serializer; } @@ -57,7 +59,7 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends KeyedProcessFuncti public void open(Configuration configure) throws Exception { super.open(configure); ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); - StateTtlConfig ttlConfig = createTtlConfig(minRetentionTime); + StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); if (ttlConfig.isEnabled()) { stateDesc.enableTimeToLive(ttlConfig); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java index 9c88e0c..9046969 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java @@ -33,8 +33,8 @@ public class ProcTimeDeduplicateKeepFirstRowFunction private static final long serialVersionUID = 5865777137707602549L; // state stores a boolean flag to indicate whether key appears before. - public ProcTimeDeduplicateKeepFirstRowFunction(long minRetentionTime) { - super(Types.BOOLEAN, null, minRetentionTime); + public ProcTimeDeduplicateKeepFirstRowFunction(long stateRetentionTime) { + super(Types.BOOLEAN, null, stateRetentionTime); } @Override diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java index a17a6e4..a378708 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java @@ -38,11 +38,11 @@ public class ProcTimeDeduplicateKeepLastRowFunction public ProcTimeDeduplicateKeepLastRowFunction( InternalTypeInfo<RowData> typeInfo, - long minRetentionTime, + long stateRetentionTime, boolean generateUpdateBefore, boolean generateInsert, boolean inputInsertOnly) { - super(typeInfo, null, minRetentionTime); + super(typeInfo, null, stateRetentionTime); this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; this.inputIsInsertOnly = inputInsertOnly; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java index 9763e51..eb2d73d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java @@ -38,12 +38,12 @@ public class ProcTimeMiniBatchDeduplicateKeepFirstRowFunction extends MiniBatchDeduplicateFunctionBase<Boolean, RowData, RowData, RowData, RowData> { private static final long serialVersionUID = -7994602893547654994L; - private TypeSerializer<RowData> serializer; + private final TypeSerializer<RowData> serializer; public ProcTimeMiniBatchDeduplicateKeepFirstRowFunction( TypeSerializer<RowData> serializer, - long minRetentionTime) { - super(Types.BOOLEAN, minRetentionTime); + long stateRetentionTime) { + super(Types.BOOLEAN, stateRetentionTime); this.serializer = serializer; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java index 1087b27..bdd17da 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java @@ -45,11 +45,11 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction public ProcTimeMiniBatchDeduplicateKeepLastRowFunction( InternalTypeInfo<RowData> typeInfo, TypeSerializer<RowData> serializer, - long minRetentionTime, + long stateRetentionTime, boolean generateUpdateBefore, boolean generateInsert, boolean inputInsertOnly) { - super(typeInfo, minRetentionTime); + super(typeInfo, stateRetentionTime); this.serializer = serializer; this.generateUpdateBefore = generateUpdateBefore; this.generateInsert = generateInsert; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java deleted file mode 100644 index 5910752..0000000 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.aggregate; - -import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.table.data.RowData; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; -import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; - -/** - * Tests for {@link GroupAggFunction}. - */ -public class GroupAggFunctionTest extends GroupAggFunctionTestBase { - - private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( - GroupAggFunction aggFunction) throws Exception { - KeyedProcessOperator<RowData, RowData, RowData> operator = new KeyedProcessOperator<>(aggFunction); - return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); - } - - private GroupAggFunction createFunction(boolean generateUpdateBefore) { - return new GroupAggFunction( - function, - equaliser, - accTypes, - -1, - generateUpdateBefore, - minTime.toMilliseconds()); - } - - @Test - public void testGroupAggWithStateTtl() throws Exception { - GroupAggFunction groupAggFunction = createFunction(false); - OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(groupAggFunction); - testHarness.open(); - testHarness.setup(); - - testHarness.processElement(insertRecord("key1", 1, 20L)); - testHarness.processElement(insertRecord("key1", 2, 0L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - - testHarness.processElement(insertRecord("key2", 1, 3999L)); - testHarness.processElement(insertRecord("key2", 2, 3000L)); - testHarness.processElement(insertRecord("key2", 3, 1000L)); - - //trigger expired state cleanup - testHarness.setStateTtlProcessingTime(20); - testHarness.processElement(insertRecord("key1", 4, 1020L)); - testHarness.processElement(insertRecord("key1", 5, 1290L)); - testHarness.processElement(insertRecord("key1", 6, 1290L)); - - testHarness.processElement(insertRecord("key2", 4, 4999L)); - testHarness.processElement(insertRecord("key2", 5, 6000L)); - testHarness.processElement(insertRecord("key2", 6, 2000L)); - - List<Object> expectedOutput = new ArrayList<>(); - expectedOutput.add(insertRecord("key1", 1L, 1L)); - expectedOutput.add(updateAfterRecord("key1", 3L, 2L)); - expectedOutput.add(updateAfterRecord("key1", 6L, 3L)); - expectedOutput.add(insertRecord("key2", 1L, 1L)); - expectedOutput.add(updateAfterRecord("key2", 3L, 2L)); - expectedOutput.add(updateAfterRecord("key2", 6L, 3L)); - //result doesn`t contain expired record with the same key - expectedOutput.add(insertRecord("key1", 4L, 1L)); - expectedOutput.add(updateAfterRecord("key1", 9L, 2L)); - expectedOutput.add(updateAfterRecord("key1", 15L, 3L)); - expectedOutput.add(insertRecord("key2", 4L, 1L)); - expectedOutput.add(updateAfterRecord("key2", 9L, 2L)); - expectedOutput.add(updateAfterRecord("key2", 15L, 3L)); - - assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); - testHarness.close(); - } -} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java deleted file mode 100644 index 03aafe8..0000000 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.aggregate; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.dataview.StateDataViewStore; -import org.apache.flink.table.runtime.generated.AggsHandleFunction; -import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; -import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; -import org.apache.flink.table.runtime.generated.RecordEqualiser; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; -import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector; -import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; -import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; -import org.apache.flink.table.runtime.util.RowDataRecordEqualiser; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.VarCharType; - -/** - * Base class of tests for all kinds of GroupAgg. - */ -abstract class GroupAggFunctionTestBase { - - Time minTime = Time.milliseconds(10); - - LogicalType[] inputFieldTypes = new LogicalType[] { - new VarCharType(VarCharType.MAX_LENGTH), - new IntType(), - new BigIntType() }; - - InternalTypeInfo<RowData> outputType = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new BigIntType(), - new BigIntType()); - - LogicalType[] accTypes = new LogicalType[] { new BigIntType(), new BigIntType() }; - BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new int[]{0}, inputFieldTypes); - TypeInformation<RowData> keyType = keySelector.getProducedType(); - GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]) { - - private static final long serialVersionUID = 1532460173848746788L; - - @Override - public RecordEqualiser newInstance(ClassLoader classLoader) { - return new RowDataRecordEqualiser(); - } - }; - - GeneratedAggsHandleFunction function = - new GeneratedAggsHandleFunction("Function", "", new Object[0]) { - @Override - public AggsHandleFunction newInstance(ClassLoader classLoader) { - return new SumAndCountAgg(); - } - }; - - RowDataHarnessAssertor assertor = new RowDataHarnessAssertor( - outputType.toRowFieldTypes(), - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); - - static final class SumAndCountAgg implements AggsHandleFunction { - private long sum; - private boolean sumIsNull; - private long count; - private boolean countIsNull; - - @Override - public void open(StateDataViewStore store) throws Exception { - } - - @Override - public void setAccumulators(RowData acc) throws Exception { - sumIsNull = acc.isNullAt(0); - if (!sumIsNull) { - sum = acc.getLong(0); - } - - countIsNull = acc.isNullAt(1); - if (!countIsNull) { - count = acc.getLong(1); - } - } - - @Override - public void accumulate(RowData inputRow) throws Exception { - boolean inputIsNull = inputRow.isNullAt(1); - if (!inputIsNull) { - sum += inputRow.getInt(1); - count += 1; - } - } - - @Override - public void retract(RowData inputRow) throws Exception { - boolean inputIsNull = inputRow.isNullAt(1); - if (!inputIsNull) { - sum -= inputRow.getInt(1); - count -= 1; - } - } - - @Override - public void merge(RowData otherAcc) throws Exception { - boolean sumIsNullOther = otherAcc.isNullAt(0); - if (!sumIsNullOther) { - sum += otherAcc.getLong(0); - } - - boolean countIsNullOther = otherAcc.isNullAt(1); - if (!countIsNullOther) { - count += otherAcc.getLong(1); - } - } - - @Override - public void resetAccumulators() throws Exception { - sum = 0L; - count = 0L; - } - - @Override - public RowData getAccumulators() throws Exception { - GenericRowData acc = new GenericRowData(2); - if (!sumIsNull) { - acc.setField(0, sum); - } - - if (!countIsNull) { - acc.setField(1, count); - } - - return acc; - } - - @Override - public RowData createAccumulators() throws Exception { - GenericRowData acc = new GenericRowData(2); - acc.setField(0, 0L); - acc.setField(1, 0L); - return acc; - } - - @Override - public RowData getValue() throws Exception { - return getAccumulators(); - } - - @Override - public void cleanup() throws Exception { - - } - - @Override - public void close() throws Exception { - - } - } - -} diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java deleted file mode 100644 index c1d3980..0000000 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.aggregate; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; -import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger; -import org.apache.flink.table.types.logical.RowType; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; -import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; - -/** - * Tests for {@link MiniBatchGroupAggFunction}. - */ -public class MiniBatchGroupAggFunctionTest extends GroupAggFunctionTestBase { - - private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( - MiniBatchGroupAggFunction aggFunction) throws Exception { - CountBundleTrigger<Tuple2<String, String>> trigger = new CountBundleTrigger<>(3); - KeyedMapBundleOperator operator = new KeyedMapBundleOperator(aggFunction, trigger); - return new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); - } - - private MiniBatchGroupAggFunction createFunction(boolean generateUpdateBefore) throws Exception { - return new MiniBatchGroupAggFunction( - function, - equaliser, - accTypes, - RowType.of(inputFieldTypes), - -1, - false, - minTime.toMilliseconds()); - } - - @Test - public void testMiniBatchGroupAggWithStateTtl() throws Exception { - - MiniBatchGroupAggFunction function = createFunction(false); - OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(function); - testHarness.open(); - testHarness.setup(); - - testHarness.processElement(insertRecord("key1", 1, 20L)); - testHarness.processElement(insertRecord("key2", 1, 3000L)); - testHarness.processElement(insertRecord("key1", 3, 999L)); - - testHarness.processElement(insertRecord("key1", 2, 500L)); - testHarness.processElement(insertRecord("key2", 2, 3999L)); - testHarness.processElement(insertRecord("key2", 3, 1000L)); - - //trigger expired state cleanup - testHarness.setStateTtlProcessingTime(20); - testHarness.processElement(insertRecord("key1", 4, 1020L)); - testHarness.processElement(insertRecord("key1", 5, 1290L)); - testHarness.processElement(insertRecord("key1", 6, 1290L)); - - testHarness.processElement(insertRecord("key2", 4, 4999L)); - testHarness.processElement(insertRecord("key2", 5, 6000L)); - testHarness.processElement(insertRecord("key2", 6, 2000L)); - - List<Object> expectedOutput = new ArrayList<>(); - expectedOutput.add(insertRecord("key1", 4L, 2L)); - expectedOutput.add(insertRecord("key2", 1L, 1L)); - expectedOutput.add(updateAfterRecord("key1", 6L, 3L)); - expectedOutput.add(updateAfterRecord("key2", 6L, 3L)); - //result doesn`t contain expired record with the same key - expectedOutput.add(insertRecord("key1", 15L, 3L)); - expectedOutput.add(insertRecord("key2", 15L, 3L)); - - assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, testHarness.getOutput()); - testHarness.close(); - } -}
