nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r599834872



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new 
GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       After some investigation and discussions with @robertwb, the cause of 
`Reshuffle` not working for the new path is that it applies a 
`ReshuffleTrigger` that triggers on every element so we will get only one 
element per bundle downstream. So even though we call the following DoFn 
`GatherBundlesPerWindowFn`, it doesn't actually gather the elements per window. 
That works OK in the fixed sharding case since we encode the shard idx 
explicitly. However, in the auto-sharding case, we don't know the shard idx and 
have to rely on counting the shards within a bundle, meaning that we need to 
guarantee that the elements in the same window are gathered in the same bundle 
to perform the final renaming.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and 
non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       This was made mainly because I changed the signature of 
`WriteShardsIntoTempFilesFn` to accept ((key, shard), V) to (shard, V). I 
reverted it in the latest commit to make the changes more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to