[ 
https://issues.apache.org/jira/browse/BEAM-11934?focusedWorklogId=565463&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-565463
 ]

ASF GitHub Bot logged work on BEAM-11934:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Mar/21 19:15
            Start Date: 12/Mar/21 19:15
    Worklog Time Spent: 10m 
      Work Description: nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593392042



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new 
GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       Two reasons:
   - `Reshuffle` expands the iterable after GBK and "Gather bundles" 
effectively reverts the expansion by re-gathering the elements in a bundle. Why 
not just use GBK?
   - `Reshuffle` didn't appear to work properly with auto-sharding where we 
might emit multiple outputs per window and 
   `Reshuffle` didn't group those outputs in the same window (causing the added 
test to fail). My rough guess was that it might be due to that we added 
timestamp in the key to group on in 
`Reshuffle`:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87
   but I could be entirely wrong.
   
   One argument of restoring this change I can think of is that runners might 
do optimizations for `Reshuffle` and using `explicit` GBK will drop such 
optimization if any. To keep the original behavior I could refactor the code to 
only use explicit GBK in the new path. Any thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 565463)
    Time Spent: 1.5h  (was: 1h 20m)

> GCS streaming file sink uses runner determined sharding
> -------------------------------------------------------
>
>                 Key: BEAM-11934
>                 URL: https://issues.apache.org/jira/browse/BEAM-11934
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files, io-java-gcp
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P1
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Integrate file sink with shardable `GroupIntoBatches` (BEAM-10475) to allow 
> runner determined dynamic sharding for streaming use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to