This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0ddaf041bd9 [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element. (#22645) 0ddaf041bd9 is described below commit 0ddaf041bd9a5bf0bad382557d0c165e55e96373 Author: Luke Cwik <lc...@google.com> AuthorDate: Thu Aug 18 13:16:12 2022 -0700 [BEAM-12776, fixes #21095] Limit parallel closes from the prior element to the next element. (#22645) * [BEAM-12776, #21095] Limit parallel closes from the prior element to the next element. This will reduce OOMs in the case where we are adding new writes faster then we are able to clean-up older ones. * Swap to parallelize the closes as well. * Address PR comments. * Swap back to setting fields during deserialization --- .../java/org/apache/beam/sdk/io/WriteFiles.java | 30 ++++++++++++++++------ 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 8d660a99ade..f38a10d68ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -919,11 +919,14 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> private class WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { - private transient @Nullable List<CompletionStage<Void>> closeFutures = null; - private transient @Nullable List<KV<Instant, FileResult<DestinationT>>> deferredOutput = null; - - @StartBundle - public void startBundle() { + private transient List<CompletionStage<Void>> closeFutures = new ArrayList<>(); + private transient List<KV<Instant, FileResult<DestinationT>>> deferredOutput = + new ArrayList<>(); + + // Ensure that transient fields are initialized. + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); closeFutures = new ArrayList<>(); deferredOutput = new ArrayList<>(); } @@ -954,7 +957,14 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> writeOrClose(writer, getDynamicDestinations().formatRecord(input)); } - // Close all writers. + // Ensure that we clean-up any prior writers that were being closed as part of this bundle + // before we return from this processElement call. This allows us to perform the writes/closes + // in parallel with the prior elements close calls and bounds the amount of data buffered to + // limit the number of OOMs. + CompletionStage<List<Void>> pastCloseFutures = MoreFutures.allAsList(closeFutures); + closeFutures.clear(); + + // Close all writers in the background for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { int shard = c.element().getKey().getShardNumber(); checkArgument( @@ -968,6 +978,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()))); closeWriterInBackground(writer); } + + // Block on completing the past closes before returning. We do so after starting the current + // closes in the background so that they can happen in parallel. + MoreFutures.get(pastCloseFutures); } private void closeWriterInBackground(Writer<DestinationT, OutputT> writer) { @@ -996,8 +1010,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT> c.output(result.getValue(), result.getKey(), result.getValue().getWindow()); } } finally { - deferredOutput = null; - closeFutures = null; + deferredOutput.clear(); + closeFutures.clear(); } } }