amaliujia commented on a change in pull request #11807:
URL: https://github.com/apache/beam/pull/11807#discussion_r431486841
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -99,14 +102,32 @@ public TableFunctionScan copy(
RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
PCollection<Row> upstream = input.get(0);
Schema outputSchema = CalciteUtils.toSchema(getRowType());
- return upstream
- .apply(
- ParDo.of(
- new FixedWindowDoFn(
-
FixedWindows.of(durationParameter(call.getOperands().get(2))),
- wmCol.getIndex(),
- outputSchema)))
- .setRowSchema(outputSchema);
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+ return windowedStream;
+ }
+
+ /** Extract timestamps from the windowFieldIndex, then window into
windowFns. */
+ private PCollection<Row> assignTimestampsAndWindow(
+ PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row,
IntervalWindow> windowFn) {
+ PCollection<Row> windowedStream;
+ windowedStream =
+ upstream
Review comment:
Not a big deal. Just want to use the name `windowedStream` to improve
readability. E.g. readers know it's returning a windowed PCollection.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]