pabloem commented on a change in pull request #14530:
URL: https://github.com/apache/beam/pull/14530#discussion_r627775740



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
##########
@@ -159,11 +159,8 @@
  *   <li><b>How many shards are generated per pane:</b> This is controlled by 
<i>sharding</i>, using
  *       {@link Write#withNumShards} or {@link Write#withSharding}. The 
default is runner-specific,
  *       so the number of shards will vary based on runner behavior, though at 
least 1 shard will
- *       always be produced for every non-empty pane. Note that setting a 
fixed number of shards can
- *       hurt performance: it adds an additional {@link GroupByKey} to the 
pipeline. However, it is
- *       required to set it when writing an unbounded {@link PCollection} due 
to <a
- *       href="https://issues.apache.org/jira/browse/BEAM-1438";>BEAM-1438</a> 
and similar behavior
- *       in other runners.
+ *       always be produced for every non-empty pane. Runner-determined 
sharding is available for
+ *       both bounded and unbounded data.

Review comment:
       bounded and unbounded? I think normally we refer to 'bounded' for batch 
pipelines. Do we support this for both?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
##########
@@ -63,14 +62,16 @@ public IdentityWindowFn(Coder<? extends BoundedWindow> 
coder) {
   }
 
   @Override
-  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c)
-      throws Exception {
+  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c) {
     // The window is provided by the prior WindowFn, which also provides the 
coder for them
     return Collections.singleton(c.window());
   }
 
   @Override
   public boolean isCompatible(WindowFn<?, ?> other) {
+    // Only compatible with itself.
+    if (this.equals(other)) return true;

Review comment:
       hmmm I think I'd prefer to skip the comparison for Flatten for same 
PCollections. I find that I have a more difficult time reasoning about windowfn 
compatibility, but if it's the same PCollection then we can avoid that 
question. Thoughts?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -744,7 +735,84 @@ public void processElement(@Element UserT element, 
ProcessContext context)
                               KV.of(hashDestination(destination, 
destinationCoder), element));
                         }
                       }))
-              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()));
+
+      if (input.getWindowingStrategy().needsMerge()) {
+        // To make sure that the elements are partitioned by original input 
windows and to work with
+        // merging windows such as sessions, we firstly group the elements by 
destination to reify
+        // the window, if the window needs to be merged, and then perform 
sharded writes for each
+        // destination and window pair.
+        Coder<BoundedWindow> windowCoder =
+            (Coder<BoundedWindow>) 
input.getWindowingStrategy().getWindowFn().windowCoder();
+        return reifyWindowAndWrite(keyedInput, windowCoder, input.getCoder());
+      } else {
+        return write(keyedInput, input.getCoder());
+      }
+    }
+
+    private PCollection<List<FileResult<DestinationT>>> reifyWindowAndWrite(
+        PCollection<KV<Integer, UserT>> keyedInput,
+        Coder<BoundedWindow> windowCoder,
+        Coder<UserT> inputCoder) {
+      PCollection<KV<ValueInSingleWindow<Integer>, UserT>> windowedInput =
+          keyedInput
+              .apply("GroupByDestination", GroupByKey.create())

Review comment:
       Why do you need to group here? Can't the window be reified without 
Grouping? Won't groupintobatches perform the shuffle?
   
   I worry that often we may have a single destination, or only a couple of 
them, so we'd be reducing parallelism to 1.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to