chamikaramj commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r597878833
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
- // Reshuffle the results to make them stable against retries.
+ // Group the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
return input
- .apply("Add void key", WithKeys.of((Void) null))
- .apply("Reshuffle", Reshuffle.of())
- .apply("Drop key", Values.create())
- .apply("Gather bundles", ParDo.of(new
GatherBundlesPerWindowFn<>()))
+ .apply("AddVoidKey", WithKeys.of((Void) null))
Review comment:
Reshuffle re-windows data in addition to the GBK so I don't think
replacing the Reshuffle here with a GBK preserves the behavior.
Probably it's good to figure out why Reshuffle did not work for the new path.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -465,6 +520,19 @@ private WriteUnshardedBundlesToTempFiles(
// shard numbers are assigned together to both the spilled and
non-spilled files in
// finalize.
.apply("GroupUnwritten", GroupByKey.create())
+ .apply(
Review comment:
Still confused :).
Could you clarify why this was needed (since this is not in the
auto-sharding path) ? Is this a bug fix for existing code ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]