scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941711400
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow
window) throws Except
writeOrClose(writer, getDynamicDestinations().formatRecord(input));
}
- // Close all writers.
+ // Clean-up any prior writers that were being closed as part of this
bundle before
Review Comment:
If we have many windows, we'll have many calls to ProcessElement and block
sequentially, only overlapping closes with single window processing instead of
multiple.
Could we instead block if the # of closing but not yet closed writers
exceeds some amount (which could be controlled by an option)?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws
Exception {
c.output(result.getValue(), result.getKey(),
result.getValue().getWindow());
}
} finally {
- deferredOutput = null;
- closeFutures = null;
+ deferredOutput.clear();
+ closeFutures.clear();
}
}
+
+ // Ensure that transient fields are initialized.
Review Comment:
I haven't seen this before, is this preferrable to just initializing these
in class constructor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]