[ 
https://issues.apache.org/jira/browse/BEAM-11934?focusedWorklogId=569185&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-569185
 ]

ASF GitHub Bot logged work on BEAM-11934:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/21 23:29
            Start Date: 19/Mar/21 23:29
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r597878833



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
     @Override
     public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
-        // Reshuffle the results to make them stable against retries.
+        // Group the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
-            .apply("Add void key", WithKeys.of((Void) null))
-            .apply("Reshuffle", Reshuffle.of())
-            .apply("Drop key", Values.create())
-            .apply("Gather bundles", ParDo.of(new 
GatherBundlesPerWindowFn<>()))
+            .apply("AddVoidKey", WithKeys.of((Void) null))

Review comment:
       Reshuffle re-windows data in addition to the GBK so I don't think 
replacing the Reshuffle here with a GBK preserves the behavior.
   
   Probably it's good to figure out why Reshuffle did not work for the new path.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
               // shard numbers are assigned together to both the spilled and 
non-spilled files in
               // finalize.
               .apply("GroupUnwritten", GroupByKey.create())
+              .apply(

Review comment:
       Still confused :).
   
   Could you clarify why this was needed (since this is not in the 
auto-sharding path) ? Is this a bug fix for existing code ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 569185)
    Time Spent: 3h 20m  (was: 3h 10m)

> GCS streaming file sink uses runner determined sharding
> -------------------------------------------------------
>
>                 Key: BEAM-11934
>                 URL: https://issues.apache.org/jira/browse/BEAM-11934
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files, io-java-gcp
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P2
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Integrate file sink with shardable `GroupIntoBatches` (BEAM-10475) to allow 
> runner determined dynamic sharding for streaming use cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to