[
https://issues.apache.org/jira/browse/BEAM-11934?focusedWorklogId=569185&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-569185
]
ASF GitHub Bot logged work on BEAM-11934:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Mar/21 23:29
Start Date: 19/Mar/21 23:29
Worklog Time Spent: 10m
Work Description: chamikaramj commented on a change in pull request
#14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r597878833
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
- // Reshuffle the results to make them stable against retries.
+ // Group the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
return input
- .apply("Add void key", WithKeys.of((Void) null))
- .apply("Reshuffle", Reshuffle.of())
- .apply("Drop key", Values.create())
- .apply("Gather bundles", ParDo.of(new
GatherBundlesPerWindowFn<>()))
+ .apply("AddVoidKey", WithKeys.of((Void) null))
Review comment:
Reshuffle re-windows data in addition to the GBK so I don't think
replacing the Reshuffle here with a GBK preserves the behavior.
Probably it's good to figure out why Reshuffle did not work for the new path.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
// shard numbers are assigned together to both the spilled and
non-spilled files in
// finalize.
.apply("GroupUnwritten", GroupByKey.create())
+ .apply(
Review comment:
Still confused :).
Could you clarify why this was needed (since this is not in the
auto-sharding path) ? Is this a bug fix for existing code ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 569185)
Time Spent: 3h 20m (was: 3h 10m)
> GCS streaming file sink uses runner determined sharding
> -------------------------------------------------------
>
> Key: BEAM-11934
> URL: https://issues.apache.org/jira/browse/BEAM-11934
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files, io-java-gcp
> Reporter: Siyuan Chen
> Assignee: Siyuan Chen
> Priority: P2
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Integrate file sink with shardable `GroupIntoBatches` (BEAM-10475) to allow
> runner determined dynamic sharding for streaming use cases.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)