kennknowles commented on a change in pull request #13759:
URL: https://github.com/apache/beam/pull/13759#discussion_r565586807
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -153,6 +171,9 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
private transient PreparedExpression exp;
private transient List<Integer> referencedColumns;
private transient PreparedExpression.Stream stream;
+ private transient Map<BoundedWindow, Queue<Context>> pending;
+ private transient Row previousRow;
+ private transient Future<Value> previousFuture;
Review comment:
`@Nullable`
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -90,6 +97,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
return new Transform();
}
+ @AutoValue
+ abstract static class Context {
Review comment:
[Context](https://www.google.com/search?q=define+context) for what? This
looks like a timestamped future to me.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -153,6 +171,9 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
private transient PreparedExpression exp;
private transient List<Integer> referencedColumns;
private transient PreparedExpression.Stream stream;
+ private transient Map<BoundedWindow, Queue<Context>> pending;
+ private transient Row previousRow;
Review comment:
`@Nullable`
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -191,27 +212,96 @@ public void setup() {
stream = exp.stream();
}
+ @StartBundle
+ public void startBundle() {
+ pending = new HashMap<>();
+ previousRow = null;
+ previousFuture = null;
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
@ProcessElement
- public void processElement(ProcessContext c) throws InterruptedException {
- Map<String, Value> columns = new HashMap<>();
- Row row = c.element();
- for (int i : referencedColumns) {
- columns.put(
- columnName(i),
- ZetaSqlBeamTranslationUtils.toZetaSqlValue(
- row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ public void processElement(
+ @Element Row row, @Timestamp Instant t, BoundedWindow w,
OutputReceiver<Row> r)
+ throws InterruptedException {
+ final Future<Value> vf;
+
+ if (row.equals(previousRow)) {
+ vf = previousFuture;
+ } else {
+ Map<String, Value> columns = new HashMap<>();
+ for (int i : referencedColumns) {
+ columns.put(
+ columnName(i),
+ ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+ row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ }
+
+ vf = stream.execute(columns, nullParams);
+ }
+ previousRow = row;
+
+ Queue<Context> pendingWindow = pending.get(w);
+ if (pendingWindow == null) {
+ pendingWindow = new ArrayDeque<>();
+ pending.put(w, pendingWindow);
}
+ pendingWindow.add(Context.create(vf, t));
+
+ while ((!pendingWindow.isEmpty() &&
pendingWindow.element().future().isDone())
+ || pendingWindow.size() > MAX_PENDING_WINDOW) {
+ outputRow(pendingWindow.remove(), r);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) throws
InterruptedException {
+ stream.flush();
+ for (Map.Entry<BoundedWindow, Queue<Context>> pendingWindow :
pending.entrySet()) {
+ OutputReceiver<Row> r = new OutputReceiverForFinishBundle(c,
pendingWindow.getKey());
+ for (Context vf : pendingWindow.getValue()) {
Review comment:
No action required, but `CompletableFuture.allOf` is idiomatic and
future-proof style for awaiting a list of futures, with the expectation that
the underlying implementation will maximize concurrency and early termination.
To utilize it you might have to do some adjustments like building a
`Future<Context>` by pairing with the timestamp and some adaptation between the
old `Future` APIs and the new `CompletionStage/CompletableFuture` APIs. I
wouldn't do it now. The result may come out worse, just because none of Java's
three future-like APIs really get it right.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -191,27 +212,95 @@ public void setup() {
stream = exp.stream();
}
+ @StartBundle
+ public void startBundle() {
+ pending = new HashMap<>();
+ previousRow = null;
+ previousFuture = null;
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
@ProcessElement
- public void processElement(ProcessContext c) throws InterruptedException {
- Map<String, Value> columns = new HashMap<>();
- Row row = c.element();
- for (int i : referencedColumns) {
- columns.put(
- columnName(i),
- ZetaSqlBeamTranslationUtils.toZetaSqlValue(
- row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ public void processElement(
+ @Element Row row, @Timestamp Instant t, BoundedWindow w,
OutputReceiver<Row> r)
+ throws InterruptedException {
+ final Future<Value> vf;
+
+ if (row.equals(previousRow)) {
Review comment:
Technically not guaranteed to be called one after the other, though
always the case in practice _if_ this is the first ParDo after windowing.
Typically they get exploded quite soon after they are first shuffled and the
compressed representation exists primarily to minimize that first shuffle. Not
complaining - I think this is a good idea here. Does `Row` have smart pointer
equality short circuit?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -191,27 +212,96 @@ public void setup() {
stream = exp.stream();
}
+ @StartBundle
+ public void startBundle() {
+ pending = new HashMap<>();
+ previousRow = null;
+ previousFuture = null;
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
@ProcessElement
- public void processElement(ProcessContext c) throws InterruptedException {
- Map<String, Value> columns = new HashMap<>();
- Row row = c.element();
- for (int i : referencedColumns) {
- columns.put(
- columnName(i),
- ZetaSqlBeamTranslationUtils.toZetaSqlValue(
- row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ public void processElement(
+ @Element Row row, @Timestamp Instant t, BoundedWindow w,
OutputReceiver<Row> r)
+ throws InterruptedException {
+ final Future<Value> vf;
Review comment:
real identifiers please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]