[
https://issues.apache.org/jira/browse/FLINK-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012391#comment-16012391
]
ASF GitHub Bot commented on FLINK-6583:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3919#discussion_r116740173
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/CountTriggerWithCleanupStateHarnessTest.scala
---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import
org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.triggers.CountTriggerWithCleanupState
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CountTriggerWithCleanupStateHarnessTest {
+ protected var queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2),
Time.seconds(3))
+
+ @Test
+ def testFiringAndFireingWithPurging(): Unit = {
+ val testHarness = new TriggerTestHarness[Any, TimeWindow](
+ CountTriggerWithCleanupState.of[TimeWindow](queryConfig, 10), new
TimeWindow.Serializer)
+
+ // try to trigger onProcessingTime method via 1, but there is non
timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(1).size())
+
+ // register cleanup timer with 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord[Any](1), new
TimeWindow(0, 9)))
+
+ // try to trigger onProcessingTime method via 1000, but there is non
timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(1000).size())
+
+ // 1000 + 2000 <= 3001 reuse timer 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord[Any](1), new
TimeWindow(0, 9)))
+
+ // there are two state entries, one is timer(3001) another is
counter(2)
+ assertEquals(2, testHarness.numStateEntries)
+
+ // try to trigger onProcessingTime method via 3001, and timer(3001) is
triggered
+ assertEquals(
+ TriggerResult.FIRE_AND_PURGE,
+ testHarness.advanceProcessingTime(3001).iterator().next().f1)
+
+ assertEquals(0, testHarness.numStateEntries)
+
+ // 3001 + 2000 >= 3001 register cleanup timer with 6001, and remove
timer 3001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord[Any](1), new
TimeWindow(0, 9)))
+
+ // try to trigger onProcessingTime method via 4002, but there is non
timer is triggered
+ assertEquals(0, testHarness.advanceProcessingTime(4002).size())
+
+ // 4002 + 2000 >= 6001 register cleanup timer via 7002, and remove
timer 6001
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord[Any](1), new
TimeWindow(0, 9)))
+
+ // 4002 + 2000 <= 7002 reuse timer 7002
+ assertEquals(
+ TriggerResult.CONTINUE,
+ testHarness.processElement(new StreamRecord[Any](1), new
TimeWindow(0, 9)))
+
+ // have one timer 7002
+ assertEquals(1, testHarness.numProcessingTimeTimers)
+ assertEquals(0, testHarness.numEventTimeTimers)
+ assertEquals(2, testHarness.numStateEntries)
+ assertEquals(2, testHarness.numStateEntries(new TimeWindow(0, 9)))
+ assertEquals(0, testHarness.numStateEntries(new TimeWindow(9, 18)))
--- End diff --
There is only a single global window. So, we do not need to test for
different windows, IMO.
> Enable QueryConfig in count base GroupWindow
> --------------------------------------------
>
> Key: FLINK-6583
> URL: https://issues.apache.org/jira/browse/FLINK-6583
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 1.3.0, 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Enable QueryConfig in count base GroupWindow by Add a custom Trigger
> `CountTriggerWithCleanupState`. See more in FLINK-6491.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)