robinyqiu commented on a change in pull request #11807:
URL: https://github.com/apache/beam/pull/11807#discussion_r431481621
##########
File path:
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4780,6 +4780,31 @@ public void testTumbleAsTVF() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
+ @Test
+ public void testTVFTumbleAggregation() {
+ String sql =
+ "SELECT COUNT(*) as field_count, "
+ + "window_start "
+ + "FROM TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL
1 SECOND') "
+ + "GROUP BY window_start";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ final Schema schema =
+
Schema.builder().addInt64Field("count_start").addDateTimeField("window_start").build();
Review comment:
Nit: `count_start` should be `field_count`.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -99,14 +102,32 @@ public TableFunctionScan copy(
RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
PCollection<Row> upstream = input.get(0);
Schema outputSchema = CalciteUtils.toSchema(getRowType());
- return upstream
- .apply(
- ParDo.of(
- new FixedWindowDoFn(
-
FixedWindows.of(durationParameter(call.getOperands().get(2))),
- wmCol.getIndex(),
- outputSchema)))
- .setRowSchema(outputSchema);
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+ return windowedStream;
+ }
+
+ /** Extract timestamps from the windowFieldIndex, then window into
windowFns. */
+ private PCollection<Row> assignTimestampsAndWindow(
+ PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row,
IntervalWindow> windowFn) {
+ PCollection<Row> windowedStream;
+ windowedStream =
+ upstream
Review comment:
Why not just `return upstream.apply(...)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]