nehsyc commented on a change in pull request #14530:
URL: https://github.com/apache/beam/pull/14530#discussion_r629666623



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
##########
@@ -63,14 +62,16 @@ public IdentityWindowFn(Coder<? extends BoundedWindow> 
coder) {
   }
 
   @Override
-  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c)
-      throws Exception {
+  public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c) {
     // The window is provided by the prior WindowFn, which also provides the 
coder for them
     return Collections.singleton(c.window());
   }
 
   @Override
   public boolean isCompatible(WindowFn<?, ?> other) {
+    // Only compatible with itself.
+    if (this.equals(other)) return true;

Review comment:
       Sounds good. Making the changes in Flatten.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -744,7 +735,84 @@ public void processElement(@Element UserT element, 
ProcessContext context)
                               KV.of(hashDestination(destination, 
destinationCoder), element));
                         }
                       }))
-              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+              .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()));
+
+      if (input.getWindowingStrategy().needsMerge()) {
+        // To make sure that the elements are partitioned by original input 
windows and to work with
+        // merging windows such as sessions, we firstly group the elements by 
destination to reify
+        // the window, if the window needs to be merged, and then perform 
sharded writes for each
+        // destination and window pair.
+        Coder<BoundedWindow> windowCoder =
+            (Coder<BoundedWindow>) 
input.getWindowingStrategy().getWindowFn().windowCoder();
+        return reifyWindowAndWrite(keyedInput, windowCoder, input.getCoder());
+      } else {
+        return write(keyedInput, input.getCoder());
+      }
+    }
+
+    private PCollection<List<FileResult<DestinationT>>> reifyWindowAndWrite(
+        PCollection<KV<Integer, UserT>> keyedInput,
+        Coder<BoundedWindow> windowCoder,
+        Coder<UserT> inputCoder) {
+      PCollection<KV<ValueInSingleWindow<Integer>, UserT>> windowedInput =
+          keyedInput
+              .apply("GroupByDestination", GroupByKey.create())

Review comment:
       Right that was my concern. We do shuffling for `GroupIntoBatches` but 
stateful DoFns are not compatible with session windows without an explicit GBK.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
##########
@@ -159,11 +159,8 @@
  *   <li><b>How many shards are generated per pane:</b> This is controlled by 
<i>sharding</i>, using
  *       {@link Write#withNumShards} or {@link Write#withSharding}. The 
default is runner-specific,
  *       so the number of shards will vary based on runner behavior, though at 
least 1 shard will
- *       always be produced for every non-empty pane. Note that setting a 
fixed number of shards can
- *       hurt performance: it adds an additional {@link GroupByKey} to the 
pipeline. However, it is
- *       required to set it when writing an unbounded {@link PCollection} due 
to <a
- *       href="https://issues.apache.org/jira/browse/BEAM-1438";>BEAM-1438</a> 
and similar behavior
- *       in other runners.
+ *       always be produced for every non-empty pane. Runner-determined 
sharding is available for
+ *       both bounded and unbounded data.

Review comment:
       Runner determined sharding for bounded data already exists before this 
PR. The same option (`withRunnerDeterminedSharding`) crashes for unbounded 
input.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to