[
https://issues.apache.org/jira/browse/FLINK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073854#comment-16073854
]
ASF GitHub Bot commented on FLINK-6649:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4157#discussion_r125501792
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
---
@@ -80,18 +80,79 @@ class NonWindowHarnessTest extends HarnessTestBase {
val expectedOutput = new ConcurrentLinkedQueue[Object]()
- expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt),
true), 1))
- expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt),
true), 1))
-
- verify(expectedOutput, result, new RowResultSortComparator(6))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, "aaa", 1:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, "bbb", 1:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, "aaa", 3:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, "aaa", 6:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, "aaa", 10:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, "bbb", 3:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, "aaa", 5:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, "aaa", 11:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, "aaa", 18:
JInt), true), 1))
+ expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, "bbb", 3:
JInt), true), 1))
+
+ verifySorted(expectedOutput, result, new RowResultSortComparator)
+
+ testHarness.close()
+ }
+
+ @Test
+ def testProcTimeNonWindowWithUpdateInterval(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new GroupAggProcessFunctionWithUpdateInterval(
+ genSumAggFunction,
+ sumAggregationStateType,
+ sumAggregationRowType,
+ false,
+ queryConfig
+ .withIdleStateRetentionTime(Time.seconds(4), Time.seconds(5))
+ .withUnboundedAggregateUpdateInterval(Time.seconds(1))))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](2),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1)
+
+ testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1:
JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1:
JInt, "bbb"), true), 1))
+ testHarness.setProcessingTime(1000)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2:
JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3:
JInt, "aaa"), true), 1))
+
+ testHarness.setProcessingTime(1002)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4:
JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2:
JInt, "bbb"), true), 1))
+
+ testHarness.setProcessingTime(4003)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5:
JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6:
JInt, "aaa"), true), 1))
+
+ // clear all states
+ testHarness.setProcessingTime(10003)
+ testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7:
JInt, "aaa"), true), 1))
+ testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3:
JInt, "bbb"), true), 1))
+
+ testHarness.setProcessingTime(12003)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, "aaa", 6:
JInt), true), 1001))
--- End diff --
Yes, Agree with you. :)
> Improve Non-window group aggregate with configurable `earlyFire`.
> -----------------------------------------------------------------
>
> Key: FLINK-6649
> URL: https://issues.apache.org/jira/browse/FLINK-6649
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Currently, Non-windowed group aggregate is earlyFiring at count(1), that is
> every row will emit a aggregate result. But some times user want config count
> number (`early firing with count[N]`) , to reduce the downstream pressure.
> This JIRA. will enable the config of e`earlyFiring` for Non-windowed group
> aggregate.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)