apilloud commented on a change in pull request #13759:
URL: https://github.com/apache/beam/pull/13759#discussion_r565654862
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
##########
@@ -191,27 +212,96 @@ public void setup() {
stream = exp.stream();
}
+ @StartBundle
+ public void startBundle() {
+ pending = new HashMap<>();
+ previousRow = null;
+ previousFuture = null;
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
@ProcessElement
- public void processElement(ProcessContext c) throws InterruptedException {
- Map<String, Value> columns = new HashMap<>();
- Row row = c.element();
- for (int i : referencedColumns) {
- columns.put(
- columnName(i),
- ZetaSqlBeamTranslationUtils.toZetaSqlValue(
- row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ public void processElement(
+ @Element Row row, @Timestamp Instant t, BoundedWindow w,
OutputReceiver<Row> r)
+ throws InterruptedException {
+ final Future<Value> vf;
+
+ if (row.equals(previousRow)) {
+ vf = previousFuture;
+ } else {
+ Map<String, Value> columns = new HashMap<>();
+ for (int i : referencedColumns) {
+ columns.put(
+ columnName(i),
+ ZetaSqlBeamTranslationUtils.toZetaSqlValue(
+ row.getBaseValue(i, Object.class),
inputSchema.getField(i).getType()));
+ }
+
+ vf = stream.execute(columns, nullParams);
+ }
+ previousRow = row;
+
+ Queue<Context> pendingWindow = pending.get(w);
+ if (pendingWindow == null) {
+ pendingWindow = new ArrayDeque<>();
+ pending.put(w, pendingWindow);
}
+ pendingWindow.add(Context.create(vf, t));
+
+ while ((!pendingWindow.isEmpty() &&
pendingWindow.element().future().isDone())
+ || pendingWindow.size() > MAX_PENDING_WINDOW) {
+ outputRow(pendingWindow.remove(), r);
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) throws
InterruptedException {
+ stream.flush();
+ for (Map.Entry<BoundedWindow, Queue<Context>> pendingWindow :
pending.entrySet()) {
+ OutputReceiver<Row> r = new OutputReceiverForFinishBundle(c,
pendingWindow.getKey());
+ for (Context vf : pendingWindow.getValue()) {
Review comment:
This would be a significant change, particularly because there is no
safe `OutputReceiver` to be used. If this was down a layer I think that would
make more sense.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]