This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0640f97  [FLINK-13004][table-runtime-blink] Correct the logic of 
needToCleanupState in KeyedProcessFunctionWithCleanupState
0640f97 is described below

commit 0640f97755f49b3331e8d81ca15dca0566a1890d
Author: Yun Tang <[email protected]>
AuthorDate: Thu Jun 27 12:56:11 2019 +0800

    [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState 
in KeyedProcessFunctionWithCleanupState
    
    This closes #8909
---
 .../runtime/harness/OverWindowHarnessTest.scala    | 28 +++++++++++-----------
 .../KeyedProcessFunctionWithCleanupState.java      |  2 +-
 .../AbstractRowTimeUnboundedPrecedingOver.java     |  4 ++--
 .../over/ProcTimeRowsBoundedPrecedingFunction.java |  2 +-
 .../over/ProcTimeUnboundedPrecedingFunction.java   |  2 +-
 .../over/RowTimeRangeBoundedPrecedingFunction.java |  2 +-
 .../over/RowTimeRowsBoundedPrecedingFunction.java  |  2 +-
 7 files changed, 21 insertions(+), 21 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index e515138..ff3f6cd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -136,9 +136,9 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       baserow(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
+      baserow(2L: JLong, "aaa", 10L: JLong, null, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(2L: JLong, "bbb", 40L: JLong, null, 30L: JLong, 40L: JLong)))
+      baserow(2L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
 
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
 
@@ -237,19 +237,19 @@ class OverWindowHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 4L: JLong)))
+      baserow(0L: JLong, "aaa", 3L: JLong, null, 3L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+      baserow(0L: JLong, "bbb", 20L: JLong, null, 20L: JLong, 20L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+      baserow(0L: JLong, "aaa", 4L: JLong, null, 4L: JLong, 4L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 5L: JLong, null, 3L: JLong, 6L: JLong)))
+      baserow(0L: JLong, "aaa", 5L: JLong, null, 5L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 6L: JLong, null, 3L: JLong, 6L: JLong)))
+      baserow(0L: JLong, "aaa", 6L: JLong, null, 5L: JLong, 6L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+      baserow(0L: JLong, "bbb", 30L: JLong, null, 30L: JLong, 30L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 7L: JLong, null, 5L: JLong, 7L: JLong)))
+      baserow(0L: JLong, "aaa", 7L: JLong, null, 7L: JLong, 7L: JLong)))
     expectedOutput.add(new StreamRecord(
       baserow(0L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
@@ -366,11 +366,11 @@ class OverWindowHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       baserow(0L: JLong, "aaa", 8L: JLong, null, 1L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 9L: JLong, null, 1L: JLong, 9L: JLong)))
+      baserow(0L: JLong, "aaa", 9L: JLong, null, 9L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "aaa", 10L: JLong, null, 1L: JLong, 10L: JLong)))
+      baserow(0L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(0L: JLong, "bbb", 40L: JLong, null, 10L: JLong, 40L: JLong)))
+      baserow(0L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
 
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
     testHarness.close()
@@ -522,7 +522,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+      baserow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
     testHarness.close()
@@ -667,7 +667,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode
     expectedOutput.add(new StreamRecord(
       baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
     expectedOutput.add(new StreamRecord(
-      baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+      baserow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
 
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
     testHarness.close()
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
index 5c8217c..815778e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
@@ -85,7 +85,7 @@ public abstract class KeyedProcessFunctionWithCleanupState<K, 
IN, OUT>
                if (stateCleaningEnabled) {
                        Long cleanupTime = cleanupTimeState.value();
                        // check that the triggered timer is the last 
registered processing time timer.
-                       return null != cleanupTime && timestamp == cleanupTime;
+                       return timestamp.equals(cleanupTime);
                } else {
                        return false;
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
index f0dc6e5..922874a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -154,10 +154,10 @@ public abstract class 
AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc
                        KeyedProcessFunction<K, BaseRow, 
BaseRow>.OnTimerContext ctx,
                        Collector<BaseRow> out) throws Exception {
                if (isProcessingTimeTimer(ctx)) {
-                       if (needToCleanupState(timestamp)) {
+                       if (stateCleaningEnabled) {
 
                                // we check whether there are still records 
which have not been processed yet
-                               boolean noRecordsToProcess = 
!inputState.contains(timestamp);
+                               boolean noRecordsToProcess = 
!inputState.keys().iterator().hasNext();
                                if (noRecordsToProcess) {
                                        // we clean the state
                                        cleanupState(inputState, accState);
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
index 72d6336..fea641d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
@@ -220,7 +220,7 @@ public class ProcTimeRowsBoundedPrecedingFunction<K> 
extends KeyedProcessFunctio
                        long timestamp,
                        KeyedProcessFunction<K, BaseRow, 
BaseRow>.OnTimerContext ctx,
                        Collector<BaseRow> out) throws Exception {
-               if (needToCleanupState(timestamp)) {
+               if (stateCleaningEnabled) {
                        cleanupState(inputState, accState, counterState, 
smallestTsState);
                        function.cleanup();
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
index 6d66372..67eb982 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
@@ -111,7 +111,7 @@ public class ProcTimeUnboundedPrecedingFunction<K> extends 
KeyedProcessFunctionW
                        long timestamp,
                        KeyedProcessFunction<K, BaseRow, 
BaseRow>.OnTimerContext ctx,
                        Collector<BaseRow> out) throws Exception {
-               if (needToCleanupState(timestamp)) {
+               if (stateCleaningEnabled) {
                        cleanupState(accState);
                        function.cleanup();
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
index 1689224..b30416e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -170,7 +170,7 @@ public class RowTimeRangeBoundedPrecedingFunction<K> 
extends KeyedProcessFunctio
                registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime());
 
                if (isProcessingTimeTimer(ctx)) {
-                       if (needToCleanupState(timestamp)) {
+                       if (stateCleaningEnabled) {
 
                                Iterator<Long> keysIt = 
inputState.keys().iterator();
                                Long lastProcessedTime = 
lastTriggeringTsState.value();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
index c141f7f..9572973 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
@@ -175,7 +175,7 @@ public class RowTimeRowsBoundedPrecedingFunction<K> extends 
KeyedProcessFunction
                        KeyedProcessFunction<K, BaseRow, 
BaseRow>.OnTimerContext ctx,
                        Collector<BaseRow> out) throws Exception {
                if (isProcessingTimeTimer(ctx)) {
-                       if (needToCleanupState(timestamp)) {
+                       if (stateCleaningEnabled) {
 
                                Iterator<Long> keysIt = 
inputState.keys().iterator();
                                Long lastProcessedTime = 
lastTriggeringTsState.value();

Reply via email to