[ 
https://issues.apache.org/jira/browse/BEAM-12040?focusedWorklogId=593045&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-593045
 ]

ASF GitHub Bot logged work on BEAM-12040:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/21 22:31
            Start Date: 06/May/21 22:31
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #14530:
URL: https://github.com/apache/beam/pull/14530#discussion_r627775740



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
##########
@@ -159,11 +159,8 @@
  *   <li><b>How many shards are generated per pane:</b> This is controlled by 
<i>sharding</i>, using
  *       {@link Write#withNumShards} or {@link Write#withSharding}. The 
default is runner-specific,
  *       so the number of shards will vary based on runner behavior, though at 
least 1 shard will
- *       always be produced for every non-empty pane. Note that setting a 
fixed number of shards can
- *       hurt performance: it adds an additional {@link GroupByKey} to the 
pipeline. However, it is
- *       required to set it when writing an unbounded {@link PCollection} due 
to <a
- *       href="https://issues.apache.org/jira/browse/BEAM-1438";>BEAM-1438</a> 
and similar behavior
- *       in other runners.
+ *       always be produced for every non-empty pane. Runner-determined 
sharding is available for
+ *       both bounded and unbounded data.

Review comment:
       bounded and unbounded? I think normally we refer to 'bounded' for batch 
pipelines. Do we support this for both?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
##########
@@ -63,14 +62,16 @@ public IdentityWindowFn(Coder<? extends BoundedWindow> 
coder) {
   }
 
   @Override
-  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c)
-      throws Exception {
+  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c) {
     // The window is provided by the prior WindowFn, which also provides the 
coder for them
     return Collections.singleton(c.window());
   }
 
   @Override
   public boolean isCompatible(WindowFn<?, ?> other) {
+    // Only compatible with itself.
+    if (this.equals(other)) return true;

Review comment:
       hmmm I think I'd prefer to skip the comparison for Flatten for same 
PCollections. I find that I have a more difficult time reasoning about windowfn 
compatibility, but if it's the same PCollection then we can avoid that 
question. Thoughts?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -744,7 +735,84 @@ public void processElement(@Element UserT element, 
ProcessContext context)
                               KV.of(hashDestination(destination, 
destinationCoder), element));
                         }
                       }))
-              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()));
+
+      if (input.getWindowingStrategy().needsMerge()) {
+        // To make sure that the elements are partitioned by original input 
windows and to work with
+        // merging windows such as sessions, we firstly group the elements by 
destination to reify
+        // the window, if the window needs to be merged, and then perform 
sharded writes for each
+        // destination and window pair.
+        Coder<BoundedWindow> windowCoder =
+            (Coder<BoundedWindow>) 
input.getWindowingStrategy().getWindowFn().windowCoder();
+        return reifyWindowAndWrite(keyedInput, windowCoder, input.getCoder());
+      } else {
+        return write(keyedInput, input.getCoder());
+      }
+    }
+
+    private PCollection<List<FileResult<DestinationT>>> reifyWindowAndWrite(
+        PCollection<KV<Integer, UserT>> keyedInput,
+        Coder<BoundedWindow> windowCoder,
+        Coder<UserT> inputCoder) {
+      PCollection<KV<ValueInSingleWindow<Integer>, UserT>> windowedInput =
+          keyedInput
+              .apply("GroupByDestination", GroupByKey.create())

Review comment:
       Why do you need to group here? Can't the window be reified without 
Grouping? Won't groupintobatches perform the shuffle?
   
   I worry that often we may have a single destination, or only a couple of 
them, so we'd be reducing parallelism to 1.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 593045)
    Time Spent: 5h 20m  (was: 5h 10m)

> WriteFiles withRunnerDeterminedSharding for unbounded data doesn't work with 
> merging windows
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-12040
>                 URL: https://issues.apache.org/jira/browse/BEAM-12040
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>            Reporter: Siyuan Chen
>            Assignee: Siyuan Chen
>            Priority: P2
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Currently the implementation of `withRunnerDeterminedShardingUnbounded` uses 
> a stateful DoFn to achieve the grouping and batching of the input elements, 
> which doesn't support session windows. One possible way is to add another GBK 
> prior to the stateful DoFn to first get session windows merged and reify the 
> window before invoking the sateful DoFn. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to