[FLINK-6859][table] remove delete useless times for improve 
StateCleaningCountTrigger

This closes #4085.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92cd736d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92cd736d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92cd736d

Branch: refs/heads/master
Commit: 92cd736daf65aae10f77bb6e8144623a5c82b5be
Parents: 825721e
Author: sunjincheng121 <[email protected]>
Authored: Wed Jun 7 19:46:32 2017 +0800
Committer: zentol <[email protected]>
Committed: Wed Jun 7 23:06:08 2017 +0200

----------------------------------------------------------------------
 .../runtime/triggers/StateCleaningCountTrigger.scala |  4 ----
 .../StateCleaningCountTriggerHarnessTest.scala       | 15 ++++++++++-----
 2 files changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92cd736d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
index f3f9246..3c18449 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/triggers/StateCleaningCountTrigger.scala
@@ -72,10 +72,6 @@ class StateCleaningCountTrigger(queryConfig: 
StreamQueryConfig, maxCount: Long)
         // register timer and remember clean-up time
         ctx.registerProcessingTimeTimer(cleanupTime)
 
-        if (null != curCleanupTime) {
-          ctx.deleteProcessingTimeTimer(curCleanupTime)
-        }
-
         ctx.getPartitionedState(cleanupStateDesc).update(cleanupTime)
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/92cd736d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
index 96601fb..93b89ca 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -62,7 +62,7 @@ class StateCleaningCountTriggerHarnessTest {
 
     assertEquals(0, testHarness.numStateEntries)
 
-    // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove timer 
3001
+    // 3001 + 2000 >= 3001 register cleanup timer with 6001
     assertEquals(
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
@@ -70,7 +70,7 @@ class StateCleaningCountTriggerHarnessTest {
     // try to trigger onProcessingTime method via 4002, but there is non timer 
is triggered
     assertEquals(0, testHarness.advanceProcessingTime(4002).size())
 
-    // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove timer 
6001
+    // 4002 + 2000 >= 6001 register cleanup timer via 7002
     assertEquals(
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
@@ -80,8 +80,8 @@ class StateCleaningCountTriggerHarnessTest {
       TriggerResult.CONTINUE,
       testHarness.processElement(new StreamRecord(1), GlobalWindow.get))
 
-    // have one timer 7002
-    assertEquals(1, testHarness.numProcessingTimeTimers)
+    // have two timers 6001 and 7002
+    assertEquals(2, testHarness.numProcessingTimeTimers)
     assertEquals(0, testHarness.numEventTimeTimers)
     assertEquals(2, testHarness.numStateEntries)
     assertEquals(2, testHarness.numStateEntries(GlobalWindow.get))
@@ -115,9 +115,14 @@ class StateCleaningCountTriggerHarnessTest {
     assertEquals(1, testHarness.numStateEntries(GlobalWindow.get))
 
     // try to trigger onProcessingTime method via 7002, and all states are 
cleared
+    val timesIt = testHarness.advanceProcessingTime(7002).iterator()
+    assertEquals(
+      TriggerResult.CONTINUE,
+      timesIt.next().f1)
+
     assertEquals(
       TriggerResult.FIRE_AND_PURGE,
-      testHarness.advanceProcessingTime(7002).iterator().next().f1)
+      timesIt.next().f1)
 
     assertEquals(0, testHarness.numStateEntries)
   }

Reply via email to