nehsyc commented on a change in pull request #14164:
URL: https://github.com/apache/beam/pull/14164#discussion_r593392042
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
##########
@@ -400,13 +443,25 @@ private GatherResults(Coder<ResultT> resultCoder) {
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
- // Reshuffle the results to make them stable against retries.
+ // Group the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
return input
- .apply("Add void key", WithKeys.of((Void) null))
- .apply("Reshuffle", Reshuffle.of())
- .apply("Drop key", Values.create())
- .apply("Gather bundles", ParDo.of(new
GatherBundlesPerWindowFn<>()))
+ .apply("AddVoidKey", WithKeys.of((Void) null))
Review comment:
Two reasons:
- `Reshuffle` expands the iterable after GBK and "Gather bundles"
effectively reverts the expansion by re-gathering the elements in a bundle. Why
not just use GBK?
- `Reshuffle` didn't appear to work properly with auto-sharding where we
might emit multiple outputs per window and
`Reshuffle` didn't group those outputs in the same window (causing the added
test to fail). My rough guess was that it might be due to that we added
timestamp in the key to group on in
`Reshuffle`:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87
but I could be entirely wrong.
One argument of restoring this change I can think of is that runners might
do optimizations for `Reshuffle` and using `explicit` GBK will drop such
optimization if any. To keep the original behavior I could refactor the code to
only use explicit GBK in the new path. Any thoughts?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]