scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r942294325
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow
window) throws Except
writeOrClose(writer, getDynamicDestinations().formatRecord(input));
}
- // Close all writers.
+ // Clean-up any prior writers that were being closed as part of this
bundle before
Review Comment:
I think close triggers a write for I/O that is in the buffer. Can we
parallelize the close of both the current batch and the previous batch? It
seems we already have memory around for the new writers so might as well flush
them before blocking on the previous batch.
so something like:
previousCloseFutures = closeFutures;
closeFutures = new List
// Start closing current writers in the background
...
// Block on previous windows closing to limit parallelism
MoreFutures.get(MoreFutures.allAsList(previousCloseFutures));
This was based upon observation in a Kafka->GCS pipeline we were optimizing.
I believe that it had 5 minute windowing and on watermark jumps during backlog
processing we would be writing 50 windows at a time and doing so serially was
slow. Unfortunately I don't recall how large the generated files were.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]