chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r597878833



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new 
GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       Reshuffle re-windows data in addition to the GBK so I don't think 
replacing the Reshuffle here with a GBK preserves the behavior.
   
   Probably it's good to figure out why Reshuffle did not work for the new path.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and 
non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Still confused :).
   
   Could you clarify why this was needed (since this is not in the 
auto-sharding path) ? Is this a bug fix for existing code ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to