This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0640f97 [FLINK-13004][table-runtime-blink] Correct the logic of
needToCleanupState in KeyedProcessFunctionWithCleanupState
0640f97 is described below
commit 0640f97755f49b3331e8d81ca15dca0566a1890d
Author: Yun Tang <[email protected]>
AuthorDate: Thu Jun 27 12:56:11 2019 +0800
[FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState
in KeyedProcessFunctionWithCleanupState
This closes #8909
---
.../runtime/harness/OverWindowHarnessTest.scala | 28 +++++++++++-----------
.../KeyedProcessFunctionWithCleanupState.java | 2 +-
.../AbstractRowTimeUnboundedPrecedingOver.java | 4 ++--
.../over/ProcTimeRowsBoundedPrecedingFunction.java | 2 +-
.../over/ProcTimeUnboundedPrecedingFunction.java | 2 +-
.../over/RowTimeRangeBoundedPrecedingFunction.java | 2 +-
.../over/RowTimeRowsBoundedPrecedingFunction.java | 2 +-
7 files changed, 21 insertions(+), 21 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index e515138..ff3f6cd 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -136,9 +136,9 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends
HarnessTestBase(mode
expectedOutput.add(new StreamRecord(
baserow(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
+ baserow(2L: JLong, "aaa", 10L: JLong, null, 10L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(2L: JLong, "bbb", 40L: JLong, null, 30L: JLong, 40L: JLong)))
+ baserow(2L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput,
result)
@@ -237,19 +237,19 @@ class OverWindowHarnessTest(mode: StateBackendMode)
extends HarnessTestBase(mode
expectedOutput.add(new StreamRecord(
baserow(0L: JLong, "aaa", 2L: JLong, null, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 3L: JLong, null, 1L: JLong, 4L: JLong)))
+ baserow(0L: JLong, "aaa", 3L: JLong, null, 3L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "bbb", 20L: JLong, null, 10L: JLong, 20L: JLong)))
+ baserow(0L: JLong, "bbb", 20L: JLong, null, 20L: JLong, 20L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 4L: JLong, null, 1L: JLong, 4L: JLong)))
+ baserow(0L: JLong, "aaa", 4L: JLong, null, 4L: JLong, 4L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 5L: JLong, null, 3L: JLong, 6L: JLong)))
+ baserow(0L: JLong, "aaa", 5L: JLong, null, 5L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 6L: JLong, null, 3L: JLong, 6L: JLong)))
+ baserow(0L: JLong, "aaa", 6L: JLong, null, 5L: JLong, 6L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "bbb", 30L: JLong, null, 20L: JLong, 30L: JLong)))
+ baserow(0L: JLong, "bbb", 30L: JLong, null, 30L: JLong, 30L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 7L: JLong, null, 5L: JLong, 7L: JLong)))
+ baserow(0L: JLong, "aaa", 7L: JLong, null, 7L: JLong, 7L: JLong)))
expectedOutput.add(new StreamRecord(
baserow(0L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
@@ -366,11 +366,11 @@ class OverWindowHarnessTest(mode: StateBackendMode)
extends HarnessTestBase(mode
expectedOutput.add(new StreamRecord(
baserow(0L: JLong, "aaa", 8L: JLong, null, 1L: JLong, 8L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 9L: JLong, null, 1L: JLong, 9L: JLong)))
+ baserow(0L: JLong, "aaa", 9L: JLong, null, 9L: JLong, 9L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "aaa", 10L: JLong, null, 1L: JLong, 10L: JLong)))
+ baserow(0L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(0L: JLong, "bbb", 40L: JLong, null, 10L: JLong, 40L: JLong)))
+ baserow(0L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput,
result)
testHarness.close()
@@ -522,7 +522,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends
HarnessTestBase(mode
expectedOutput.add(new StreamRecord(
baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+ baserow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput,
result)
testHarness.close()
@@ -667,7 +667,7 @@ class OverWindowHarnessTest(mode: StateBackendMode) extends
HarnessTestBase(mode
expectedOutput.add(new StreamRecord(
baserow(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong)))
expectedOutput.add(new StreamRecord(
- baserow(20011L: JLong, "ccc", 3L: JLong, 1L: JLong, 3L: JLong)))
+ baserow(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong)))
assertor.assertOutputEqualsSorted("result mismatch", expectedOutput,
result)
testHarness.close()
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
index 5c8217c..815778e 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/KeyedProcessFunctionWithCleanupState.java
@@ -85,7 +85,7 @@ public abstract class KeyedProcessFunctionWithCleanupState<K,
IN, OUT>
if (stateCleaningEnabled) {
Long cleanupTime = cleanupTimeState.value();
// check that the triggered timer is the last
registered processing time timer.
- return null != cleanupTime && timestamp == cleanupTime;
+ return timestamp.equals(cleanupTime);
} else {
return false;
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
index f0dc6e5..922874a 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -154,10 +154,10 @@ public abstract class
AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc
KeyedProcessFunction<K, BaseRow,
BaseRow>.OnTimerContext ctx,
Collector<BaseRow> out) throws Exception {
if (isProcessingTimeTimer(ctx)) {
- if (needToCleanupState(timestamp)) {
+ if (stateCleaningEnabled) {
// we check whether there are still records
which have not been processed yet
- boolean noRecordsToProcess =
!inputState.contains(timestamp);
+ boolean noRecordsToProcess =
!inputState.keys().iterator().hasNext();
if (noRecordsToProcess) {
// we clean the state
cleanupState(inputState, accState);
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
index 72d6336..fea641d 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeRowsBoundedPrecedingFunction.java
@@ -220,7 +220,7 @@ public class ProcTimeRowsBoundedPrecedingFunction<K>
extends KeyedProcessFunctio
long timestamp,
KeyedProcessFunction<K, BaseRow,
BaseRow>.OnTimerContext ctx,
Collector<BaseRow> out) throws Exception {
- if (needToCleanupState(timestamp)) {
+ if (stateCleaningEnabled) {
cleanupState(inputState, accState, counterState,
smallestTsState);
function.cleanup();
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
index 6d66372..67eb982 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/ProcTimeUnboundedPrecedingFunction.java
@@ -111,7 +111,7 @@ public class ProcTimeUnboundedPrecedingFunction<K> extends
KeyedProcessFunctionW
long timestamp,
KeyedProcessFunction<K, BaseRow,
BaseRow>.OnTimerContext ctx,
Collector<BaseRow> out) throws Exception {
- if (needToCleanupState(timestamp)) {
+ if (stateCleaningEnabled) {
cleanupState(accState);
function.cleanup();
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
index 1689224..b30416e 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRangeBoundedPrecedingFunction.java
@@ -170,7 +170,7 @@ public class RowTimeRangeBoundedPrecedingFunction<K>
extends KeyedProcessFunctio
registerProcessingCleanupTimer(ctx,
ctx.timerService().currentProcessingTime());
if (isProcessingTimeTimer(ctx)) {
- if (needToCleanupState(timestamp)) {
+ if (stateCleaningEnabled) {
Iterator<Long> keysIt =
inputState.keys().iterator();
Long lastProcessedTime =
lastTriggeringTsState.value();
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
index c141f7f..9572973 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/over/RowTimeRowsBoundedPrecedingFunction.java
@@ -175,7 +175,7 @@ public class RowTimeRowsBoundedPrecedingFunction<K> extends
KeyedProcessFunction
KeyedProcessFunction<K, BaseRow,
BaseRow>.OnTimerContext ctx,
Collector<BaseRow> out) throws Exception {
if (isProcessingTimeTimer(ctx)) {
- if (needToCleanupState(timestamp)) {
+ if (stateCleaningEnabled) {
Iterator<Long> keysIt =
inputState.keys().iterator();
Long lastProcessedTime =
lastTriggeringTsState.value();