nehsyc commented on a change in pull request #14530:
URL: https://github.com/apache/beam/pull/14530#discussion_r629666623
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
##########
@@ -63,14 +62,16 @@ public IdentityWindowFn(Coder<? extends BoundedWindow>
coder) {
}
@Override
- public Collection<BoundedWindow> assignWindows(WindowFn<T,
BoundedWindow>.AssignContext c)
- throws Exception {
+ public Collection<BoundedWindow> assignWindows(WindowFn<T,
BoundedWindow>.AssignContext c) {
// The window is provided by the prior WindowFn, which also provides the
coder for them
return Collections.singleton(c.window());
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
+ // Only compatible with itself.
+ if (this.equals(other)) return true;
Review comment:
Sounds good. Making the changes in Flatten.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -744,7 +735,84 @@ public void processElement(@Element UserT element,
ProcessContext context)
KV.of(hashDestination(destination,
destinationCoder), element));
}
}))
- .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+ .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()));
+
+ if (input.getWindowingStrategy().needsMerge()) {
+ // To make sure that the elements are partitioned by original input
windows and to work with
+ // merging windows such as sessions, we firstly group the elements by
destination to reify
+ // the window, if the window needs to be merged, and then perform
sharded writes for each
+ // destination and window pair.
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
+ return reifyWindowAndWrite(keyedInput, windowCoder, input.getCoder());
+ } else {
+ return write(keyedInput, input.getCoder());
+ }
+ }
+
+ private PCollection<List<FileResult<DestinationT>>> reifyWindowAndWrite(
+ PCollection<KV<Integer, UserT>> keyedInput,
+ Coder<BoundedWindow> windowCoder,
+ Coder<UserT> inputCoder) {
+ PCollection<KV<ValueInSingleWindow<Integer>, UserT>> windowedInput =
+ keyedInput
+ .apply("GroupByDestination", GroupByKey.create())
Review comment:
Right that was my concern. We do shuffling for `GroupIntoBatches` but
stateful DoFns are not compatible with session windows without an explicit GBK.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
##########
@@ -159,11 +159,8 @@
* <li><b>How many shards are generated per pane:</b> This is controlled by
<i>sharding</i>, using
* {@link Write#withNumShards} or {@link Write#withSharding}. The
default is runner-specific,
* so the number of shards will vary based on runner behavior, though at
least 1 shard will
- * always be produced for every non-empty pane. Note that setting a
fixed number of shards can
- * hurt performance: it adds an additional {@link GroupByKey} to the
pipeline. However, it is
- * required to set it when writing an unbounded {@link PCollection} due
to <a
- * href="https://issues.apache.org/jira/browse/BEAM-1438">BEAM-1438</a>
and similar behavior
- * in other runners.
+ * always be produced for every non-empty pane. Runner-determined
sharding is available for
+ * both bounded and unbounded data.
Review comment:
Runner determined sharding for bounded data already exists before this
PR. The same option (`withRunnerDeterminedSharding`) crashes for unbounded
input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]