[
https://issues.apache.org/jira/browse/BEAM-12040?focusedWorklogId=576365&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-576365
]
ASF GitHub Bot logged work on BEAM-12040:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/21 00:32
Start Date: 03/Apr/21 00:32
Worklog Time Spent: 10m
Work Description: nehsyc commented on a change in pull request #14421:
URL: https://github.com/apache/beam/pull/14421#discussion_r606533999
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -770,73 +740,71 @@ public void processElement(@Element UserT element,
ProcessContext context)
}
}))
.setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+ .apply("GroupByDestination", GroupByKey.create())
.apply(
- "ShardAndBatch",
- GroupIntoBatches.<Integer,
UserT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
-
.withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
- .withShardedKey())
- .setCoder(
- KvCoder.of(
-
org.apache.beam.sdk.util.ShardedKey.Coder.of(VarIntCoder.of()),
- IterableCoder.of(input.getCoder())));
-
- // Write grouped elements to temp files.
- PCollection<FileResult<DestinationT>> tempFiles =
- shardedInput
- .apply(
- "AddDummyShard",
- MapElements.via(
- new SimpleFunction<
- KV<org.apache.beam.sdk.util.ShardedKey<Integer>,
Iterable<UserT>>,
- KV<ShardedKey<Integer>, Iterable<UserT>>>() {
- @Override
- public KV<ShardedKey<Integer>, Iterable<UserT>> apply(
- KV<org.apache.beam.sdk.util.ShardedKey<Integer>,
Iterable<UserT>>
- input) {
- // Add dummy shard since it is required by
WriteShardsIntoTempFilesFn. It
- // will be dropped after we generate the temp files.
- return KV.of(
- ShardedKey.of(input.getKey().getKey(),
DUMMY_SHARDNUM),
- input.getValue());
+ "ReifyWindowInKey",
+ ParDo.of(
+ new DoFn<
+ KV<Integer, Iterable<UserT>>,
KV<ValueInSingleWindow<Integer>, UserT>>() {
+ @ProcessElement
+ public void processElement(
+ @Element KV<Integer, Iterable<UserT>> element,
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ PaneInfo pane,
+ OutputReceiver<KV<ValueInSingleWindow<Integer>,
UserT>> r) {
+ for (UserT value : element.getValue()) {
+ r.output(
+ KV.of(
+ ValueInSingleWindow.of(
+ element.getKey(), timestamp, window,
pane),
+ value));
+ }
}
}))
.setCoder(
KvCoder.of(
- ShardedKeyCoder.of(VarIntCoder.of()),
IterableCoder.of(input.getCoder())))
+ ValueInSingleWindow.Coder.of(VarIntCoder.of(),
windowCoder),
+ input.getCoder()));
+
+ // Write elements to temp files.
+ PCollection<KV<ValueInSingleWindow<DestinationT>,
FileResult<DestinationT>>> tempFiles =
+ groupedWindowedInput
.apply(
- "WriteShardsIntoTempFiles",
- ParDo.of(new
WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
- .setCoder(fileResultCoder)
+ "RewindowIntoGlobal",
+ Window.<KV<ValueInSingleWindow<Integer>, UserT>>into(new
GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes())
.apply(
- "DropShardNum",
- ParDo.of(
- new DoFn<FileResult<DestinationT>,
FileResult<DestinationT>>() {
- @ProcessElement
- public void process(ProcessContext c) {
- c.output(c.element().withShard(UNKNOWN_SHARDNUM));
- }
- }));
+ "ShardAndBatch",
+ GroupIntoBatches.<ValueInSingleWindow<Integer>, UserT>ofSize(
+ FILE_TRIGGERING_RECORD_COUNT)
+
.withMaxBufferingDuration(FILE_TRIGGERING_RECORD_BUFFERING_DURATION)
+ .withShardedKey())
+ .apply(
+ "WriteShardsIntoTempFiles",
+ ParDo.of(new WriteWindowedShardsIntoTempFilesFn())
+ .withSideInputs(getSideInputs()))
+ .setCoder(
+ KvCoder.of(
+ ValueInSingleWindow.Coder.of(destinationCoder,
windowCoder),
+ fileResultCoder));
- // Group temp file results by destinations again to gather all the
results in the same window.
+ // Group temp file results by destination again to gather all the files
in the same window.
// This is needed since we don't have shard idx associated with each
temp file so have to rely
// on the indexing within a bundle.
return tempFiles
.apply(
- "KeyedByDestination",
- WithKeys.of(
- new SimpleFunction<FileResult<DestinationT>, DestinationT>()
{
- @Override
- public DestinationT apply(FileResult<DestinationT> input) {
- return input.getDestination();
- }
- }))
- .setCoder(KvCoder.of(destinationCoder, fileResultCoder))
- .apply(GroupByKey.create())
+ Window.<KV<ValueInSingleWindow<DestinationT>,
FileResult<DestinationT>>>into(
Review comment:
@robertwb Any thoughts on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 576365)
Time Spent: 0.5h (was: 20m)
> WriteFiles withRunnerDeterminedShardingUnbounded doesn't work with session
> windows
> ----------------------------------------------------------------------------------
>
> Key: BEAM-12040
> URL: https://issues.apache.org/jira/browse/BEAM-12040
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Reporter: Siyuan Chen
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently the implementation of `withRunnerDeterminedShardingUnbounded` uses
> a stateful DoFn to achieve the grouping and batching of the input elements,
> which doesn't support session windows. One possible way is to add another GBK
> prior to the stateful DoFn to first get session windows merged and reify the
> window before invoking the sateful DoFn.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)