This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddaf041bd9 [BEAM-12776, fixes #21095] Limit parallel closes from the 
prior element to the next element. (#22645)
0ddaf041bd9 is described below

commit 0ddaf041bd9a5bf0bad382557d0c165e55e96373
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Aug 18 13:16:12 2022 -0700

    [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to 
the next element. (#22645)
    
    * [BEAM-12776, #21095] Limit parallel closes from the prior element to the 
next element.
    
    This will reduce OOMs in the case where we are adding new writes faster 
then we are able to clean-up older ones.
    
    * Swap to parallelize the closes as well.
    
    * Address PR comments.
    
    * Swap back to setting fields during deserialization
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 30 ++++++++++++++++------
 1 file changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 8d660a99ade..f38a10d68ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -919,11 +919,14 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
 
   private class WriteShardsIntoTempFilesFn
       extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, 
FileResult<DestinationT>> {
-    private transient @Nullable List<CompletionStage<Void>> closeFutures = 
null;
-    private transient @Nullable List<KV<Instant, FileResult<DestinationT>>> 
deferredOutput = null;
-
-    @StartBundle
-    public void startBundle() {
+    private transient List<CompletionStage<Void>> closeFutures = new 
ArrayList<>();
+    private transient List<KV<Instant, FileResult<DestinationT>>> 
deferredOutput =
+        new ArrayList<>();
+
+    // Ensure that transient fields are initialized.
+    private void readObject(java.io.ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
       closeFutures = new ArrayList<>();
       deferredOutput = new ArrayList<>();
     }
@@ -954,7 +957,14 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Ensure that we clean-up any prior writers that were being closed as 
part of this bundle
+      // before we return from this processElement call. This allows us to 
perform the writes/closes
+      // in parallel with the prior elements close calls and bounds the amount 
of data buffered to
+      // limit the number of OOMs.
+      CompletionStage<List<Void>> pastCloseFutures = 
MoreFutures.allAsList(closeFutures);
+      closeFutures.clear();
+
+      // Close all writers in the background
       for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : 
writers.entrySet()) {
         int shard = c.element().getKey().getShardNumber();
         checkArgument(
@@ -968,6 +978,10 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
                 new FileResult<>(writer.getOutputFile(), shard, window, 
c.pane(), entry.getKey())));
         closeWriterInBackground(writer);
       }
+
+      // Block on completing the past closes before returning. We do so after 
starting the current
+      // closes in the background so that they can happen in parallel.
+      MoreFutures.get(pastCloseFutures);
     }
 
     private void closeWriterInBackground(Writer<DestinationT, OutputT> writer) 
{
@@ -996,8 +1010,8 @@ public abstract class WriteFiles<UserT, DestinationT, 
OutputT>
           c.output(result.getValue(), result.getKey(), 
result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
   }

Reply via email to