[FLINK-6859][table] remove delete useless times for improve StateCleaningCountTrigger
This closes #4085. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92cd736d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92cd736d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92cd736d Branch: refs/heads/master Commit: 92cd736daf65aae10f77bb6e8144623a5c82b5be Parents: 825721e Author: sunjincheng121 <[email protected]> Authored: Wed Jun 7 19:46:32 2017 +0800 Committer: zentol <[email protected]> Committed: Wed Jun 7 23:06:08 2017 +0200 ---------------------------------------------------------------------- .../runtime/triggers/StateCleaningCountTrigger.scala | 4 ---- .../StateCleaningCountTriggerHarnessTest.scala | 15 ++++++++++----- 2 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/92cd736d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala index f3f9246..3c18449 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala @@ -72,10 +72,6 @@ class StateCleaningCountTrigger(queryConfig: StreamQueryConfig, maxCount: Long) // register timer and remember clean-up time ctx.registerProcessingTimeTimer(cleanupTime) - if (null != curCleanupTime) { - ctx.deleteProcessingTimeTimer(curCleanupTime) - } - ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime) } } http://git-wip-us.apache.org/repos/asf/flink/blob/92cd736d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala index 96601fb..93b89ca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala @@ -62,7 +62,7 @@ class StateCleaningCountTriggerHarnessTest { assertEquals(0, testHarness.numStateEntries) - // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 3001 + // 3001 + 2000 >= 3001 register cleanup timer with 6001 assertEquals( TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) @@ -70,7 +70,7 @@ class StateCleaningCountTriggerHarnessTest { // try to trigger onProcessingTime method via 4002, but there is non timer is triggered assertEquals(0, testHarness.advanceProcessingTime(4002).size()) - // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 6001 + // 4002 + 2000 >= 6001 register cleanup timer via 7002 assertEquals( TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) @@ -80,8 +80,8 @@ class StateCleaningCountTriggerHarnessTest { TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord(1), GlobalWindow.get)) - // have one timer 7002 - assertEquals(1, testHarness.numProcessingTimeTimers) + // have two timers 6001 and 7002 + assertEquals(2, testHarness.numProcessingTimeTimers) assertEquals(0, testHarness.numEventTimeTimers) assertEquals(2, testHarness.numStateEntries) assertEquals(2, testHarness.numStateEntries(GlobalWindow.get)) @@ -115,9 +115,14 @@ class StateCleaningCountTriggerHarnessTest { assertEquals(1, testHarness.numStateEntries(GlobalWindow.get)) // try to trigger onProcessingTime method via 7002, and all states are cleared + val timesIt = testHarness.advanceProcessingTime(7002).iterator() + assertEquals( + TriggerResult.CONTINUE, + timesIt.next().f1) + assertEquals( TriggerResult.FIRE_AND_PURGE, - testHarness.advanceProcessingTime(7002).iterator().next().f1) + timesIt.next().f1) assertEquals(0, testHarness.numStateEntries) }
