[
https://issues.apache.org/jira/browse/BEAM-9363?focusedWorklogId=438077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-438077
]
ASF GitHub Bot logged work on BEAM-9363:
----------------------------------------
Author: ASF GitHub Bot
Created on: 27/May/20 22:49
Start Date: 27/May/20 22:49
Worklog Time Spent: 10m
Work Description: robinyqiu commented on a change in pull request #11807:
URL: https://github.com/apache/beam/pull/11807#discussion_r431481621
##########
File path:
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4780,6 +4780,31 @@ public void testTumbleAsTVF() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
+ @Test
+ public void testTVFTumbleAggregation() {
+ String sql =
+ "SELECT COUNT(*) as field_count, "
+ + "window_start "
+ + "FROM TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL
1 SECOND') "
+ + "GROUP BY window_start";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ final Schema schema =
+
Schema.builder().addInt64Field("count_start").addDateTimeField("window_start").build();
Review comment:
Nit: `count_start` should be `field_count`.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -99,14 +102,32 @@ public TableFunctionScan copy(
RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
PCollection<Row> upstream = input.get(0);
Schema outputSchema = CalciteUtils.toSchema(getRowType());
- return upstream
- .apply(
- ParDo.of(
- new FixedWindowDoFn(
-
FixedWindows.of(durationParameter(call.getOperands().get(2))),
- wmCol.getIndex(),
- outputSchema)))
- .setRowSchema(outputSchema);
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+ return windowedStream;
+ }
+
+ /** Extract timestamps from the windowFieldIndex, then window into
windowFns. */
+ private PCollection<Row> assignTimestampsAndWindow(
+ PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row,
IntervalWindow> windowFn) {
+ PCollection<Row> windowedStream;
+ windowedStream =
+ upstream
Review comment:
Why not just `return upstream.apply(...)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 438077)
Time Spent: 7h 20m (was: 7h 10m)
> BeamSQL windowing as TVF
> ------------------------
>
> Key: BEAM-9363
> URL: https://issues.apache.org/jira/browse/BEAM-9363
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql, dsl-sql-zetasql
> Reporter: Rui Wang
> Assignee: Rui Wang
> Priority: P2
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> This Jira tracks the implementation for
> https://s.apache.org/streaming-beam-sql
> TVF is table-valued function, which is a SQL feature that produce a table as
> function's output.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)