LadyForest commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1446810962
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##########
@@ -279,19 +279,19 @@ object AggregateUtil extends Enumeration {
typeFactory: FlinkTypeFactory,
inputRowType: RowType,
aggCalls: Seq[AggregateCall],
+ needRetraction: Boolean,
windowSpec: WindowSpec,
isStateBackendDataViews: Boolean): AggregateInfoList = {
- // Hopping window requires additional COUNT(*) to determine whether to
register next timer
- // through whether the current fired window is empty, see
SliceSharedWindowAggProcessor.
- val needInputCount = windowSpec.isInstanceOf[HoppingWindowSpec]
+ // Hopping window always requires additional COUNT(*) to determine
whether to register next
Review Comment:
Nit: remove extra whitespace between "determine" and "whether"
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -219,8 +219,13 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait,
requester)
- case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
- _: StreamPhysicalWindowDeduplicate =>
+ case window: StreamPhysicalWindowAggregate =>
+ // WindowAggregate and WindowTableAggregate support all changes in
input
+ val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
+ val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)
Review Comment:
Nit: Add a TODO to mark the provided trait set can be extended once we
support emit strategy using the TVF syntax
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala:
##########
@@ -127,6 +162,26 @@ class WindowAggregateTest(aggPhaseEnforcer:
AggregatePhaseStrategy) extends Tabl
util.verifyRelPlan(sql)
}
+ @TestTemplate
+ def testTumble_OnProctimeWithCDCSource(): Unit = {
+ assumeThat(isTwoPhase).isTrue
Review Comment:
Is there any particular reason to just enable two-phase test?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java:
##########
@@ -231,4 +239,33 @@ protected void collect(RowData aggResult) {
reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(),
aggResult);
ctx.output(reuseOutput);
}
+
+ /** A supplier that returns whether the window is empty. */
+ protected final class WindowIsEmptySupplier implements Supplier<Boolean>,
Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int indexOfCountStar;
+
+ private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner
assigner) {
+ if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
+ checkArgument(
+ indexOfCountStar >= 0,
+ "Hopping window requires a COUNT(*) in the aggregate
functions.");
+ }
+ this.indexOfCountStar = indexOfCountStar;
+ }
+
+ @Override
+ public Boolean get() {
+ if (indexOfCountStar < 0) {
Review Comment:
I saw there is a precondition check `indexOfCounter`. In which condition
might it be negative?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala:
##########
@@ -756,6 +756,147 @@ object TestData {
row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"),
"Comment#3", "b")
)
+ val windowChangelogDataWithTimestamp: Seq[Row] = List(
Review Comment:
+----+---------------------+---+-----+-----+-------+------------+---+
| Op | Timestamp | 1 | 2 | 3 | 4 | 5 | 6 |
+----+---------------------+---+-----+-----+-------+------------+---+
| +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 | 1.11 | Hi | a |
| +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a |
| -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 | 1.11 | Hi | a |
| +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a |
| +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 | null | a |
| -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a |
| +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a |
| +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 | null | Hello | b |
| +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 | 6.66 | Hi | b |
| +I | 2020-10-10 00:00:08 | 3 |null | 3.0 | 3.33 | Comment#2 | a |
| +I | 2020-10-10 00:00:04 | 5 | 5.0 |null | 5.55 | Hi | a |
| +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 | 4.44 | Hi | b |
| -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 | null | a |
| +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b |
| -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b |
+----+---------------------+---+-----+-----+-------+------------+---+
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]