This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit df8de4ac45884bd3134abaf87baeb65c31060b3a
Author: Jark Wu <[email protected]>
AuthorDate: Thu Nov 19 16:50:42 2020 +0800

    [hotfix][table-blink] Improve state cleanup harness tests for group 
aggregation
---
 .../stream/StreamExecGlobalGroupAggregate.scala    |   2 +-
 .../physical/stream/StreamExecGroupAggregate.scala |   4 +-
 .../stream/StreamExecGroupTableAggregate.scala     |   2 +-
 .../StreamExecIncrementalGroupAggregate.scala      |   2 +-
 .../harness/GroupAggregateHarnessTest.scala        | 127 ++++++++++++++-
 .../harness/TableAggregateHarnessTest.scala        |   7 +-
 .../runtime/dataview/PerKeyStateDataViewStore.java |   3 +
 .../table/runtime/dataview/StateListView.java      |   9 +-
 .../operators/aggregate/GroupAggFunction.java      |   2 -
 .../aggregate/MiniBatchGroupAggFunction.java       |  17 ++
 .../deduplicate/DeduplicateFunctionBase.java       |  10 +-
 .../ProcTimeDeduplicateKeepFirstRowFunction.java   |   4 +-
 .../ProcTimeDeduplicateKeepLastRowFunction.java    |   4 +-
 ...meMiniBatchDeduplicateKeepFirstRowFunction.java |   6 +-
 ...imeMiniBatchDeduplicateKeepLastRowFunction.java |   4 +-
 .../operators/aggregate/GroupAggFunctionTest.java  |  98 -----------
 .../aggregate/GroupAggFunctionTestBase.java        | 180 ---------------------
 .../aggregate/MiniBatchGroupAggFunctionTest.java   |  98 -----------
 18 files changed, 173 insertions(+), 406 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index ee007ef..6020e72 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -163,7 +163,7 @@ class StreamExecGlobalGroupAggregate(
         globalAccTypes,
         indexOfCountStar,
         generateUpdateBefore,
-        tableConfig.getMinIdleStateRetentionTime)
+        tableConfig.getIdleStateRetention.toMillis)
 
       new KeyedMapBundleOperator(
         aggFunction,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 5d1d249..cece6e8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -158,7 +158,7 @@ class StreamExecGroupAggregate(
         inputRowType,
         inputCountIndex,
         generateUpdateBefore,
-        tableConfig.getMinIdleStateRetentionTime)
+        tableConfig.getIdleStateRetention.toMillis)
 
       new KeyedMapBundleOperator(
         aggFunction,
@@ -170,7 +170,7 @@ class StreamExecGroupAggregate(
         accTypes,
         inputCountIndex,
         generateUpdateBefore,
-        tableConfig.getMinIdleStateRetentionTime)
+        tableConfig.getIdleStateRetention.toMillis)
 
       val operator = new KeyedProcessOperator[RowData, RowData, 
RowData](aggFunction)
       operator
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala
index d808aa8..ef276fa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupTableAggregate.scala
@@ -142,7 +142,7 @@ class StreamExecGroupTableAggregate(
       accTypes,
       inputCountIndex,
       generateUpdateBefore,
-      tableConfig.getMinIdleStateRetentionTime)
+      tableConfig.getIdleStateRetention.toMillis)
     val operator = new KeyedProcessOperator[RowData, RowData, 
RowData](aggFunction)
 
     val selector = KeySelectorUtil.getRowDataSelector(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index 2d00216..aa69619 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -167,7 +167,7 @@ class StreamExecIncrementalGroupAggregate(
       partialAggsHandler,
       finalAggsHandler,
       finalKeySelector,
-      config.getMinIdleStateRetentionTime)
+      config.getIdleStateRetention.toMillis)
 
     val operator = new KeyedMapBundleOperator(
       aggFunction,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 462d5a1..0b8ada6 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -18,28 +18,35 @@
 
 package org.apache.flink.table.planner.runtime.harness
 
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
+import org.apache.flink.table.api.{EnvironmentSettings, _}
 import org.apache.flink.table.api.bridge.scala._
 import 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
-import org.apache.flink.table.api.EnvironmentSettings
-import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.api.config.ExecutionConfigOptions.{TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
 TABLE_EXEC_MINIBATCH_ENABLED, TABLE_EXEC_MINIBATCH_SIZE}
+import 
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode,
 MiniBatchOff, MiniBatchOn}
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
+import 
org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.CountNullNonNull
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
 import org.apache.flink.types.Row
 import org.apache.flink.types.RowKind._
+
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
 import java.lang.{Long => JLong}
+import java.time.Duration
 import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.{Collection => JCollection}
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
-class GroupAggregateHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
+class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: 
MiniBatchMode)
+    extends HarnessTestBase(mode) {
 
   @Before
   override def before(): Unit = {
@@ -47,6 +54,19 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
     val config = new TestTableConfig
     this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+    // set mini batch
+    val tableConfig = tEnv.getConfig
+    miniBatch match {
+      case MiniBatchOn =>
+        tableConfig.getConfiguration.setBoolean(TABLE_EXEC_MINIBATCH_ENABLED, 
true)
+        tableConfig.getConfiguration.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1))
+        // trigger every record for easier harness test
+        tableConfig.getConfiguration.setLong(TABLE_EXEC_MINIBATCH_SIZE, 1L)
+        // disable local-global to only test the MiniBatchGroupAggFunction
+        
tableConfig.getConfiguration.setString(TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
"ONE_PHASE")
+      case MiniBatchOff =>
+        
tableConfig.getConfiguration.removeConfig(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY)
+    }
   }
 
   @Test
@@ -65,7 +85,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
     val testHarness = createHarnessTester(t1.toRetractStream[Row], 
"GroupAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(
@@ -138,4 +158,99 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
     testHarness.close()
   }
 
+  @Test
+  def testAggregationWithDistinct(): Unit = {
+    val data = new mutable.MutableList[(String, String, Long)]
+    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.createTemporaryView("T", t)
+    tEnv.createTemporarySystemFunction("CntNullNonNull", new CountNullNonNull)
+
+    val sql =
+      """
+        |SELECT a, COUNT(DISTINCT b), CntNullNonNull(DISTINCT b), COUNT(*), 
SUM(c)
+        |FROM T
+        |GROUP BY a
+      """.stripMargin
+    val t1 = tEnv.sqlQuery(sql)
+
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
+    val testHarness = createHarnessTester(t1.toRetractStream[Row], 
"GroupAggregate")
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.BIGINT().getLogicalType,
+        DataTypes.STRING().getLogicalType,
+        DataTypes.BIGINT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    testHarness.open()
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // set ttl processing time to 1
+    testHarness.setStateTtlProcessingTime(1)
+
+    // insertion
+    testHarness.processElement(binaryRecord(INSERT,"aaa", "a1", 1L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "aaa", 1L: JLong, "1|0", 1L: 
JLong, 1L: JLong))
+
+    // insertion
+    testHarness.processElement(binaryRecord(INSERT, "bbb", "b1", 2L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: 
JLong, 2L: JLong))
+
+    // advance ttl processing time
+    testHarness.setStateTtlProcessingTime(1000)
+
+    // update for insertion
+    testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 1L: JLong, "1|0", 
1L: JLong, 1L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|0", 2L: 
JLong, 3L: JLong))
+
+    // this should expire "bbb" state
+    testHarness.setStateTtlProcessingTime(2001)
+
+    // accumulate from initial state
+    testHarness.processElement(binaryRecord(INSERT, "bbb", "b3", 3L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: 
JLong, 3L: JLong))
+    // "aaa" is not expired
+    testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 2L: JLong, "2|0", 
2L: JLong, 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|0", 3L: 
JLong, 6L: JLong))
+    // test null key
+    testHarness.processElement(binaryRecord(INSERT, "aaa", null, 4L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 2L: JLong, "2|0", 
3L: JLong, 6L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 2L: JLong, "2|1", 4L: 
JLong, 10L: JLong))
+
+    // this should expire "aaa" state
+    testHarness.setStateTtlProcessingTime(5001)
+
+    // accumulate from initial state
+    testHarness.processElement(binaryRecord(INSERT, "aaa", null, 4L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "aaa", 0L: JLong, "0|1", 1L: 
JLong, 4L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "aaa", "a2", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "aaa", 0L: JLong, "0|1", 
1L: JLong, 4L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "aaa", 1L: JLong, "1|1", 2L: 
JLong, 6L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "bbb", "b4", 4L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "bbb", 1L: JLong, "1|0", 1L: 
JLong, 4L: JLong))
+
+    val result = testHarness.getOutput
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
+
+    testHarness.close()
+  }
+
+}
+
+object GroupAggregateHarnessTest {
+
+  @Parameterized.Parameters(name = "StateBackend={0}, MiniBatch={1}")
+  def parameters(): JCollection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(HEAP_BACKEND, MiniBatchOff),
+      Array(HEAP_BACKEND, MiniBatchOn),
+      Array(ROCKSDB_BACKEND, MiniBatchOff),
+      Array(ROCKSDB_BACKEND, MiniBatchOn)
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index b6add51..32b2688 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.harness
 
 import java.lang.{Integer => JInt}
 import java.util.concurrent.ConcurrentLinkedQueue
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
@@ -35,6 +34,8 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
+import java.time.Duration
+
 import scala.collection.mutable
 
 @RunWith(classOf[Parameterized])
@@ -60,7 +61,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
       .flatAggregate(top3('b) as ('b1, 'b2))
       .select('a, 'b1, 'b2)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
     val testHarness = createHarnessTester(
       resultTable.toRetractStream[Row], "GroupTableAggregate")
     val assertor = new RowDataHarnessAssertor(
@@ -125,7 +126,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
       .flatAggregate(top3('b) as ('b1, 'b2))
       .select('b1, 'b2)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
     val testHarness = createHarnessTester(
       resultTable.toRetractStream[Row], "GroupTableAggregate")
     val assertor = new RowDataHarnessAssertor(
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java
index 5b592b4..93b0462 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/PerKeyStateDataViewStore.java
@@ -70,6 +70,9 @@ public final class PerKeyStateDataViewStore implements 
StateDataViewStore {
                        final ValueStateDescriptor<EV> nullStateDescriptor = 
new ValueStateDescriptor<>(
                                stateName + NULL_STATE_POSTFIX,
                                valueSerializer);
+                       if (stateTtlConfig.isEnabled()) {
+                               
nullStateDescriptor.enableTimeToLive(stateTtlConfig);
+                       }
                        final ValueState<EV> nullState = 
ctx.getState(nullStateDescriptor);
                        return new 
StateMapView.KeyedStateMapViewWithKeysNullable<>(mapState, nullState);
                } else {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
index 782fa06..1970546 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateListView.java
@@ -77,7 +77,14 @@ public abstract class StateListView<N, EE> extends 
ListView<EE> implements State
 
        @Override
        public boolean remove(EE value) throws Exception {
-               Iterator<EE> iterator = getListState().get().iterator();
+               Iterable<EE> iterable = getListState().get();
+               if (iterable == null) {
+                       // ListState.get() may return null according to the 
Javadoc.
+                       return false;
+               }
+               // the getListState().get() not always returns List object
+               // copy values to ArrayList for removing
+               Iterator<EE> iterator = iterable.iterator();
                List<EE> list = new ArrayList<>();
                while (iterator.hasNext()) {
                        EE it = iterator.next();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
index 247ae47..9f35472 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
@@ -23,10 +23,8 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.table.data.JoinedRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
 import org.apache.flink.table.runtime.generated.AggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
index 79bdd4a..5a6953c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
@@ -41,10 +41,12 @@ import org.apache.flink.util.Collector;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg;
 import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
 
 /**
@@ -185,6 +187,21 @@ public class MiniBatchGroupAggFunction extends 
MapBundleFunction<RowData, List<R
                        ctx.setCurrentKey(currentKey);
                        RowData acc = accState.value();
                        if (acc == null) {
+                               // Don't create a new accumulator for a 
retraction message. This
+                               // might happen if the retraction message is 
the first message for the
+                               // key or after a state clean up.
+                               Iterator<RowData> inputIter = 
inputRows.iterator();
+                               while (inputIter.hasNext()) {
+                                       RowData current = inputIter.next();
+                                       if (isRetractMsg(current)) {
+                                               inputIter.remove(); // remove 
all the beginning retraction messages
+                                       } else {
+                                               break;
+                                       }
+                               }
+                               if (inputRows.isEmpty()) {
+                                       return;
+                               }
                                acc = function.createAccumulators();
                                firstRow = true;
                        }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
index 1138c18..5d06c81 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
@@ -37,9 +37,11 @@ import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlCo
  */
 abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
 
+       private static final long serialVersionUID = 1L;
+
        // the TypeInformation of the values in the state.
        protected final TypeInformation<T> typeInfo;
-       protected final long minRetentionTime;
+       protected final long stateRetentionTime;
        protected final TypeSerializer<OUT> serializer;
        // state stores previous message under the key.
        protected ValueState<T> state;
@@ -47,9 +49,9 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFuncti
        public DeduplicateFunctionBase(
                        TypeInformation<T> typeInfo,
                        TypeSerializer<OUT> serializer,
-                       long minRetentionTime) {
+                       long stateRetentionTime) {
                this.typeInfo = typeInfo;
-               this.minRetentionTime = minRetentionTime;
+               this.stateRetentionTime = stateRetentionTime;
                this.serializer = serializer;
        }
 
@@ -57,7 +59,7 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFuncti
        public void open(Configuration configure) throws Exception {
                super.open(configure);
                ValueStateDescriptor<T> stateDesc = new 
ValueStateDescriptor<>("deduplicate-state", typeInfo);
-               StateTtlConfig ttlConfig = createTtlConfig(minRetentionTime);
+               StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
                if (ttlConfig.isEnabled()) {
                        stateDesc.enableTimeToLive(ttlConfig);
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
index 9c88e0c..9046969 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
@@ -33,8 +33,8 @@ public class ProcTimeDeduplicateKeepFirstRowFunction
        private static final long serialVersionUID = 5865777137707602549L;
 
        // state stores a boolean flag to indicate whether key appears before.
-       public ProcTimeDeduplicateKeepFirstRowFunction(long minRetentionTime) {
-               super(Types.BOOLEAN, null, minRetentionTime);
+       public ProcTimeDeduplicateKeepFirstRowFunction(long stateRetentionTime) 
{
+               super(Types.BOOLEAN, null, stateRetentionTime);
        }
 
        @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
index a17a6e4..a378708 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
@@ -38,11 +38,11 @@ public class ProcTimeDeduplicateKeepLastRowFunction
 
        public ProcTimeDeduplicateKeepLastRowFunction(
                        InternalTypeInfo<RowData> typeInfo,
-                       long minRetentionTime,
+                       long stateRetentionTime,
                        boolean generateUpdateBefore,
                        boolean generateInsert,
                        boolean inputInsertOnly) {
-               super(typeInfo, null, minRetentionTime);
+               super(typeInfo, null, stateRetentionTime);
                this.generateUpdateBefore = generateUpdateBefore;
                this.generateInsert = generateInsert;
                this.inputIsInsertOnly = inputInsertOnly;
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
index 9763e51..eb2d73d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
@@ -38,12 +38,12 @@ public class 
ProcTimeMiniBatchDeduplicateKeepFirstRowFunction
                extends MiniBatchDeduplicateFunctionBase<Boolean, RowData, 
RowData, RowData, RowData> {
 
        private static final long serialVersionUID = -7994602893547654994L;
-       private TypeSerializer<RowData> serializer;
+       private final TypeSerializer<RowData> serializer;
 
        public ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(
                        TypeSerializer<RowData> serializer,
-                       long minRetentionTime) {
-               super(Types.BOOLEAN, minRetentionTime);
+                       long stateRetentionTime) {
+               super(Types.BOOLEAN, stateRetentionTime);
                this.serializer = serializer;
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
index 1087b27..bdd17da 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
@@ -45,11 +45,11 @@ public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
        public ProcTimeMiniBatchDeduplicateKeepLastRowFunction(
                        InternalTypeInfo<RowData> typeInfo,
                        TypeSerializer<RowData> serializer,
-                       long minRetentionTime,
+                       long stateRetentionTime,
                        boolean generateUpdateBefore,
                        boolean generateInsert,
                        boolean inputInsertOnly) {
-               super(typeInfo, minRetentionTime);
+               super(typeInfo, stateRetentionTime);
                this.serializer = serializer;
                this.generateUpdateBefore = generateUpdateBefore;
                this.generateInsert = generateInsert;
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java
deleted file mode 100644
index 5910752..0000000
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.aggregate;
-
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.table.data.RowData;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
-
-/**
- * Tests for {@link GroupAggFunction}.
- */
-public class GroupAggFunctionTest extends GroupAggFunctionTestBase {
-
-       private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-               GroupAggFunction aggFunction) throws Exception {
-               KeyedProcessOperator<RowData, RowData, RowData> operator = new 
KeyedProcessOperator<>(aggFunction);
-               return new KeyedOneInputStreamOperatorTestHarness<>(operator, 
keySelector, keyType);
-       }
-
-       private GroupAggFunction createFunction(boolean generateUpdateBefore) {
-               return new GroupAggFunction(
-                       function,
-                       equaliser,
-                       accTypes,
-                       -1,
-                       generateUpdateBefore,
-                       minTime.toMilliseconds());
-       }
-
-       @Test
-       public void testGroupAggWithStateTtl() throws Exception {
-               GroupAggFunction groupAggFunction = createFunction(false);
-               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(groupAggFunction);
-               testHarness.open();
-               testHarness.setup();
-
-               testHarness.processElement(insertRecord("key1", 1, 20L));
-               testHarness.processElement(insertRecord("key1", 2, 0L));
-               testHarness.processElement(insertRecord("key1", 3, 999L));
-
-               testHarness.processElement(insertRecord("key2", 1, 3999L));
-               testHarness.processElement(insertRecord("key2", 2, 3000L));
-               testHarness.processElement(insertRecord("key2", 3, 1000L));
-
-               //trigger expired state cleanup
-               testHarness.setStateTtlProcessingTime(20);
-               testHarness.processElement(insertRecord("key1", 4, 1020L));
-               testHarness.processElement(insertRecord("key1", 5, 1290L));
-               testHarness.processElement(insertRecord("key1", 6, 1290L));
-
-               testHarness.processElement(insertRecord("key2", 4, 4999L));
-               testHarness.processElement(insertRecord("key2", 5, 6000L));
-               testHarness.processElement(insertRecord("key2", 6, 2000L));
-
-               List<Object> expectedOutput = new ArrayList<>();
-               expectedOutput.add(insertRecord("key1", 1L, 1L));
-               expectedOutput.add(updateAfterRecord("key1", 3L, 2L));
-               expectedOutput.add(updateAfterRecord("key1", 6L, 3L));
-               expectedOutput.add(insertRecord("key2", 1L, 1L));
-               expectedOutput.add(updateAfterRecord("key2", 3L, 2L));
-               expectedOutput.add(updateAfterRecord("key2", 6L, 3L));
-               //result doesn`t contain expired record with the same key
-               expectedOutput.add(insertRecord("key1", 4L, 1L));
-               expectedOutput.add(updateAfterRecord("key1", 9L, 2L));
-               expectedOutput.add(updateAfterRecord("key1", 15L, 3L));
-               expectedOutput.add(insertRecord("key2", 4L, 1L));
-               expectedOutput.add(updateAfterRecord("key2", 9L, 2L));
-               expectedOutput.add(updateAfterRecord("key2", 15L, 3L));
-
-               assertor.assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
-               testHarness.close();
-       }
-}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java
deleted file mode 100644
index 03aafe8..0000000
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionTestBase.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.aggregate;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.dataview.StateDataViewStore;
-import org.apache.flink.table.runtime.generated.AggsHandleFunction;
-import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
-import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
-import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.util.BinaryRowDataKeySelector;
-import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
-import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
-import org.apache.flink.table.runtime.util.RowDataRecordEqualiser;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.VarCharType;
-
-/**
- * Base class of tests for all kinds of GroupAgg.
- */
-abstract class GroupAggFunctionTestBase {
-
-       Time minTime = Time.milliseconds(10);
-
-       LogicalType[] inputFieldTypes = new LogicalType[] {
-                       new VarCharType(VarCharType.MAX_LENGTH),
-                       new IntType(),
-                       new BigIntType() };
-
-       InternalTypeInfo<RowData> outputType = InternalTypeInfo.ofFields(
-                       new VarCharType(VarCharType.MAX_LENGTH),
-                       new BigIntType(),
-                       new BigIntType());
-
-       LogicalType[] accTypes = new LogicalType[] { new BigIntType(), new 
BigIntType() };
-       BinaryRowDataKeySelector keySelector = new BinaryRowDataKeySelector(new 
int[]{0}, inputFieldTypes);
-       TypeInformation<RowData> keyType = keySelector.getProducedType();
-       GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", 
"", new Object[0]) {
-
-               private static final long serialVersionUID = 
1532460173848746788L;
-
-               @Override
-               public RecordEqualiser newInstance(ClassLoader classLoader) {
-                       return new RowDataRecordEqualiser();
-               }
-       };
-
-       GeneratedAggsHandleFunction function =
-                       new GeneratedAggsHandleFunction("Function", "", new 
Object[0]) {
-               @Override
-               public AggsHandleFunction newInstance(ClassLoader classLoader) {
-                       return new SumAndCountAgg();
-               }
-       };
-
-       RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(
-               outputType.toRowFieldTypes(),
-               new GenericRowRecordSortComparator(0, new 
VarCharType(VarCharType.MAX_LENGTH)));
-
-       static final class SumAndCountAgg implements AggsHandleFunction {
-               private long sum;
-               private boolean sumIsNull;
-               private long count;
-               private boolean countIsNull;
-
-               @Override
-               public void open(StateDataViewStore store) throws Exception {
-               }
-
-               @Override
-               public void setAccumulators(RowData acc) throws Exception {
-                       sumIsNull = acc.isNullAt(0);
-                       if (!sumIsNull) {
-                               sum = acc.getLong(0);
-                       }
-
-                       countIsNull = acc.isNullAt(1);
-                       if (!countIsNull) {
-                               count = acc.getLong(1);
-                       }
-               }
-
-               @Override
-               public void accumulate(RowData inputRow) throws Exception {
-                       boolean inputIsNull = inputRow.isNullAt(1);
-                       if (!inputIsNull) {
-                               sum += inputRow.getInt(1);
-                               count += 1;
-                       }
-               }
-
-               @Override
-               public void retract(RowData inputRow) throws Exception {
-                       boolean inputIsNull = inputRow.isNullAt(1);
-                       if (!inputIsNull) {
-                               sum -= inputRow.getInt(1);
-                               count -= 1;
-                       }
-               }
-
-               @Override
-               public void merge(RowData otherAcc) throws Exception {
-                       boolean sumIsNullOther = otherAcc.isNullAt(0);
-                       if (!sumIsNullOther) {
-                               sum += otherAcc.getLong(0);
-                       }
-
-                       boolean countIsNullOther = otherAcc.isNullAt(1);
-                       if (!countIsNullOther) {
-                               count += otherAcc.getLong(1);
-                       }
-               }
-
-               @Override
-               public void resetAccumulators() throws Exception {
-                       sum = 0L;
-                       count = 0L;
-               }
-
-               @Override
-               public RowData getAccumulators() throws Exception {
-                       GenericRowData acc = new GenericRowData(2);
-                       if (!sumIsNull) {
-                               acc.setField(0, sum);
-                       }
-
-                       if (!countIsNull) {
-                               acc.setField(1, count);
-                       }
-
-                       return acc;
-               }
-
-               @Override
-               public RowData createAccumulators() throws Exception {
-                       GenericRowData acc = new GenericRowData(2);
-                       acc.setField(0, 0L);
-                       acc.setField(1, 0L);
-                       return acc;
-               }
-
-               @Override
-               public RowData getValue() throws Exception {
-                       return getAccumulators();
-               }
-
-               @Override
-               public void cleanup() throws Exception {
-
-               }
-
-               @Override
-               public void close() throws Exception {
-
-               }
-       }
-
-}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java
deleted file mode 100644
index c1d3980..0000000
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunctionTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.aggregate;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
-import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
-
-/**
- * Tests for {@link MiniBatchGroupAggFunction}.
- */
-public class MiniBatchGroupAggFunctionTest extends GroupAggFunctionTestBase {
-
-       private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-               MiniBatchGroupAggFunction aggFunction) throws Exception {
-               CountBundleTrigger<Tuple2<String, String>> trigger = new 
CountBundleTrigger<>(3);
-               KeyedMapBundleOperator operator = new 
KeyedMapBundleOperator(aggFunction, trigger);
-               return new KeyedOneInputStreamOperatorTestHarness<>(operator, 
keySelector, keyType);
-       }
-
-       private MiniBatchGroupAggFunction createFunction(boolean 
generateUpdateBefore) throws Exception {
-               return new MiniBatchGroupAggFunction(
-                       function,
-                       equaliser,
-                       accTypes,
-                       RowType.of(inputFieldTypes),
-                       -1,
-                       false,
-                       minTime.toMilliseconds());
-       }
-
-       @Test
-       public void testMiniBatchGroupAggWithStateTtl() throws Exception {
-
-               MiniBatchGroupAggFunction function = createFunction(false);
-               OneInputStreamOperatorTestHarness<RowData, RowData> testHarness 
= createTestHarness(function);
-               testHarness.open();
-               testHarness.setup();
-
-               testHarness.processElement(insertRecord("key1", 1, 20L));
-               testHarness.processElement(insertRecord("key2", 1, 3000L));
-               testHarness.processElement(insertRecord("key1", 3, 999L));
-
-               testHarness.processElement(insertRecord("key1", 2, 500L));
-               testHarness.processElement(insertRecord("key2", 2, 3999L));
-               testHarness.processElement(insertRecord("key2", 3, 1000L));
-
-               //trigger expired state cleanup
-               testHarness.setStateTtlProcessingTime(20);
-               testHarness.processElement(insertRecord("key1", 4, 1020L));
-               testHarness.processElement(insertRecord("key1", 5, 1290L));
-               testHarness.processElement(insertRecord("key1", 6, 1290L));
-
-               testHarness.processElement(insertRecord("key2", 4, 4999L));
-               testHarness.processElement(insertRecord("key2", 5, 6000L));
-               testHarness.processElement(insertRecord("key2", 6, 2000L));
-
-               List<Object> expectedOutput = new ArrayList<>();
-               expectedOutput.add(insertRecord("key1", 4L, 2L));
-               expectedOutput.add(insertRecord("key2", 1L, 1L));
-               expectedOutput.add(updateAfterRecord("key1", 6L, 3L));
-               expectedOutput.add(updateAfterRecord("key2", 6L, 3L));
-               //result doesn`t contain expired record with the same key
-               expectedOutput.add(insertRecord("key1", 15L, 3L));
-               expectedOutput.add(insertRecord("key2", 15L, 3L));
-
-               assertor.assertOutputEqualsSorted("output wrong.", 
expectedOutput, testHarness.getOutput());
-               testHarness.close();
-       }
-}

Reply via email to