scwhittle commented on code in PR #22645:
URL: https://github.com/apache/beam/pull/22645#discussion_r941711400


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -954,7 +949,17 @@ public void processElement(ProcessContext c, BoundedWindow 
window) throws Except
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
       }
 
-      // Close all writers.
+      // Clean-up any prior writers that were being closed as part of this 
bundle before

Review Comment:
   If we have many windows, we'll have many calls to ProcessElement and block 
sequentially, only overlapping closes with single window processing instead of 
multiple.
   
   Could we instead block if the # of closing but not yet closed writers 
exceeds some amount (which could be controlled by an option)?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java:
##########
@@ -996,10 +1001,18 @@ public void finishBundle(FinishBundleContext c) throws 
Exception {
           c.output(result.getValue(), result.getKey(), 
result.getValue().getWindow());
         }
       } finally {
-        deferredOutput = null;
-        closeFutures = null;
+        deferredOutput.clear();
+        closeFutures.clear();
       }
     }
+
+    // Ensure that transient fields are initialized.

Review Comment:
   I haven't seen this before, is this preferrable to just initializing these 
in class constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to