[
https://issues.apache.org/jira/browse/BEAM-12040?focusedWorklogId=586111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-586111
]
ASF GitHub Bot logged work on BEAM-12040:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Apr/21 19:16
Start Date: 20/Apr/21 19:16
Worklog Time Spent: 10m
Work Description: nehsyc commented on a change in pull request #14530:
URL: https://github.com/apache/beam/pull/14530#discussion_r616967664
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -744,7 +735,84 @@ public void processElement(@Element UserT element,
ProcessContext context)
KV.of(hashDestination(destination,
destinationCoder), element));
}
}))
- .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+ .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()));
+
+ if (input.getWindowingStrategy().needsMerge()) {
+ // To make sure that the elements are partitioned by original input
windows and to work with
+ // merging windows such as sessions, we firstly group the elements by
destination to reify
+ // the window, if the window needs to be merged, and then perform
sharded writes for each
+ // destination and window pair.
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
+ return reifyWindowAndWrite(keyedInput, windowCoder, input.getCoder());
+ } else {
+ return write(keyedInput, input.getCoder());
+ }
+ }
+
+ private PCollection<List<FileResult<DestinationT>>> reifyWindowAndWrite(
+ PCollection<KV<Integer, UserT>> keyedInput,
+ Coder<BoundedWindow> windowCoder,
+ Coder<UserT> inputCoder) {
+ PCollection<KV<ValueInSingleWindow<Integer>, UserT>> windowedInput =
+ keyedInput
+ .apply("GroupByDestination", GroupByKey.create())
Review comment:
This GBK could be a bottleneck although we "reshard" before the write
which is potentially more expensive. I don't have a better idea other than
reifying the window in the keys.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 586111)
Time Spent: 4h 50m (was: 4h 40m)
> WriteFiles withRunnerDeterminedSharding for unbounded data doesn't work with
> merging windows
> --------------------------------------------------------------------------------------------
>
> Key: BEAM-12040
> URL: https://issues.apache.org/jira/browse/BEAM-12040
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Reporter: Siyuan Chen
> Assignee: Siyuan Chen
> Priority: P2
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Currently the implementation of `withRunnerDeterminedShardingUnbounded` uses
> a stateful DoFn to achieve the grouping and batching of the input elements,
> which doesn't support session windows. One possible way is to add another GBK
> prior to the stateful DoFn to first get session windows merged and reify the
> window before invoking the sateful DoFn.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)