fhueske commented on code in PR #25526:
URL: https://github.com/apache/flink/pull/25526#discussion_r1801586512
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/SqlCallSyntax.java:
##########
@@ -316,4 +316,23 @@ private String doUnParse(
CallSyntaxUtils.asSerializableOperand(operands.get(2)));
}
};
+
+ SqlCallSyntax OVER =
+ ((sqlName, operands) -> {
+ String projection = operands.get(0).asSerializableString();
Review Comment:
```suggestion
String partitioning = operands.get(0).asSerializableString();
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -740,4 +740,89 @@ private static Instant dayOfSeconds(int second) {
},
"MySink")
.build();
+
+ static final TableTestProgram OVER_WINDOW_RANGE =
Review Comment:
Should we also test the `UNBOUNDED PRECEDING` case here?
It is handled differently than the bounded preceding window case.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -740,4 +740,89 @@ private static Instant dayOfSeconds(int second) {
},
"MySink")
.build();
+
+ static final TableTestProgram OVER_WINDOW_RANGE =
+ TableTestProgram.of("over-window-range", "test over window with
time range")
+ .setupTableSource(
+ SourceTestStep.newBuilder("data")
+ .addSchema(
+ "k string",
+ "v bigint",
+ "ts TIMESTAMP_LTZ(3)",
+ "WATERMARK for `ts` AS `ts`")
+ .producedBeforeRestore(
+ Row.of("Apple", 5L,
dayOfSeconds(0)),
+ Row.of("Apple", 4L,
dayOfSeconds(1)))
+ .producedAfterRestore(Row.of("Apple", 3L,
dayOfSeconds(2)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("k string", "v bigint", "ts
TIMESTAMP_LTZ(3)")
+ .consumedBeforeRestore(
+ Row.of("Apple", 5L,
dayOfSeconds(0)),
+ Row.of("Apple", 4L,
dayOfSeconds(1)))
+ .consumedAfterRestore(Row.of("Apple", 3L,
dayOfSeconds(2)))
+ .build())
+ .runSql(
+ "SELECT `k`, (LAST_VALUE(`v`) OVER(PARTITION BY
`k` ORDER BY `ts` RANGE BETWEEN INTERVAL '0 00:00:02.0' DAY TO SECOND(3)
PRECEDING AND CURRENT ROW)) AS `_c1`, `ts` FROM (\n"
+ + " SELECT `k`, `v`, `ts` FROM
`default_catalog`.`default_database`.`data`\n"
+ + ")")
+ .runTableApi(
+ tableEnvAccessor ->
+ tableEnvAccessor
+ .from("data")
+ .window(
+ Over.partitionBy($("k"))
+ .orderBy($("ts"))
+
.preceding(lit(2).second())
+ .as("w"))
+ .select(
+ $("k"),
+
$("v").lastValue().over($("w")),
+ $("ts")),
+ "sink")
+ .build();
+
+ static final TableTestProgram OVER_WINDOW_ROWS =
Review Comment:
Should we also test the `UNBOUNDED PRECEDING` case here?
It is handled differently than the bounded preceding window case.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -740,4 +740,89 @@ private static Instant dayOfSeconds(int second) {
},
"MySink")
.build();
+
+ static final TableTestProgram OVER_WINDOW_RANGE =
Review Comment:
+ we should also test `OVER` without `PARTITION BY` (and possibly without
`ORDER BY`)
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/SqlCallSyntax.java:
##########
@@ -316,4 +316,23 @@ private String doUnParse(
CallSyntaxUtils.asSerializableOperand(operands.get(2)));
}
};
+
+ SqlCallSyntax OVER =
+ ((sqlName, operands) -> {
+ String projection = operands.get(0).asSerializableString();
+ String order = operands.get(1).asSerializableString();
+ String rangeBounds =
+ CallSyntaxUtils.overRangeToSerializableString(
+ operands.get(2), operands.get(3));
+ if (operands.size() == 4) {
+ return String.format("%s OVER(ORDER BY %s %s)",
projection, order, rangeBounds);
+ } else {
+ return String.format(
+ "%s OVER(PARTITION BY %s ORDER BY %s %s)",
+ projection,
+
CallSyntaxUtils.asSerializableOperand(operands.get(4)),
+ order,
+ rangeBounds);
+ }
Review Comment:
I think this logic might not be sufficient to capture the full scope of
`OVER`
Neither `PARTITION BY` or `ORDER BY` clauses are required for `OVER` windows.
Without `PARTITION BY`, we cannot parallelize the operation. This is
supported here.
However, in batch mode, also `ORDER BY` is optional. Without `ORDER BY`, we
cannot specify a range so it is always the full partition. Some aggregation
functions need `ORDER BY`, such as `LAST_VALUE()`, `ROW_NUMBER()`, etc.., but
others like `SUM()` work fine without order by (on the full partition).
I'm not sure which variants of `OVER` we support in batch mode, but IMO it
would be good to support all variants that are valid according to the SQL
standard (if possible).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]