scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r942294325


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow 
window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this 
bundle before

Review Comment:
   I think close triggers a write for I/O that is in the buffer. Can we 
parallelize the close of both the current batch and the previous batch? It 
seems we already have memory around for the new writers so might as well flush 
them before blocking on the previous batch.
   
   so something like:
   previousCloseFutures = closeFutures;
   closeFutures = new List
   
   // Start closing current writers in the background
   ...
   
   // Block on previous windows closing to limit parallelism
   MoreFutures.get(MoreFutures.allAsList(previousCloseFutures));
   
   This was based upon observation in a Kafka->GCS pipeline we were optimizing. 
I believe that it had 5 minute windowing and on watermark jumps during backlog 
processing we would be writing 50 windows at a time and doing so serially was 
slow.  Unfortunately I don't recall how large the generated files were.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to