This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2f73a9534a709dd1fe775ab31e0598d2d89f123c Author: Eugene Kirpichov <[email protected]> AuthorDate: Fri Nov 17 12:25:45 2017 -0800 Refactors WriteFiles into sub-transforms --- .../java/org/apache/beam/sdk/io/WriteFiles.java | 630 +++++++++++---------- 1 file changed, 322 insertions(+), 308 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 87459e9..0a538b1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Objects; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -37,7 +36,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; @@ -176,53 +175,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> return PCollectionViews.toAdditionalInputs(sideInputs); } - @Override - public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) { - if (input.isBounded() == IsBounded.UNBOUNDED) { - checkArgument(windowedWrites, - "Must use windowed writes when applying %s to an unbounded PCollection", - WriteFiles.class.getSimpleName()); - } - if (windowedWrites) { - // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 - // and similar behavior in other runners. - checkArgument( - computeNumShards != null || numShardsProvider != null, - "When using windowed writes, must specify number of output shards explicitly", - WriteFiles.class.getSimpleName()); - } - this.writeOperation = sink.createWriteOperation(); - this.writeOperation.setWindowedWrites(windowedWrites); - return createWrite(input); - } - - @Override - public void validate(PipelineOptions options) { - sink.validate(options); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink")) - .include("sink", sink); - if (getSharding() != null) { - builder.include("sharding", getSharding()); - } else { - builder.addIfNotNull(DisplayData.item("numShards", getNumShards()) - .withLabel("Fixed Number of Shards")); - } - } - /** Returns the {@link FileBasedSink} associated with this PTransform. */ public FileBasedSink<UserT, DestinationT, OutputT> getSink() { return sink; } - /** - * Returns whether or not to perform windowed writes. - */ + /** Returns whether or not to perform windowed writes. */ public boolean isWindowedWrites() { return windowedWrites; } @@ -339,50 +297,189 @@ public class WriteFiles<UserT, DestinationT, OutputT> sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs); } - private static class WriterKey<DestinationT> { - private final BoundedWindow window; - private final PaneInfo paneInfo; - private final DestinationT destination; + @Override + public void validate(PipelineOptions options) { + sink.validate(options); + } - WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { - this.window = window; - this.paneInfo = paneInfo; - this.destination = destination; + @Override + public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) { + if (input.isBounded() == IsBounded.UNBOUNDED) { + checkArgument( + windowedWrites, + "Must use windowed writes when applying %s to an unbounded PCollection", + WriteFiles.class.getSimpleName()); + } + if (windowedWrites) { + // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 + // and similar behavior in other runners. + checkArgument( + computeNumShards != null || numShardsProvider != null, + "When using windowed writes, must specify number of output shards explicitly", + WriteFiles.class.getSimpleName()); } + this.writeOperation = sink.createWriteOperation(); + this.writeOperation.setWindowedWrites(windowedWrites); - @Override - public boolean equals(Object o) { - if (!(o instanceof WriterKey)) { - return false; - } - WriterKey other = (WriterKey) o; - return Objects.equal(window, other.window) - && Objects.equal(paneInfo, other.paneInfo) - && Objects.equal(destination, other.destination); + if (!windowedWrites) { + // Re-window the data into the global window and remove any existing triggers. + input = + input.apply( + "RewindowIntoGlobal", + Window.<UserT>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + } + + Coder<DestinationT> destinationCoder; + try { + destinationCoder = + getDynamicDestinations() + .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); + destinationCoder.verifyDeterministic(); + } catch (CannotProvideCoderException | NonDeterministicException e) { + throw new RuntimeException(e); + } + @SuppressWarnings("unchecked") + Coder<BoundedWindow> windowCoder = + (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); + FileResultCoder<DestinationT> fileResultCoder = + FileResultCoder.of(windowCoder, destinationCoder); + + PCollectionView<Integer> numShardsView = + (computeNumShards == null) ? null : input.apply(computeNumShards); + + PCollection<FileResult<DestinationT>> tempFileResults = + (computeNumShards == null && numShardsProvider == null) + ? input.apply( + "WriteUnshardedBundlesToTempFiles", + new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder)) + : input.apply( + "WriteShardedBundlesToTempFiles", + new WriteShardedBundlesToTempFiles( + destinationCoder, fileResultCoder, numShardsView)); + + return tempFileResults + .apply("GatherTempFileResults", new GatherResults<>(fileResultCoder)) + .apply( + "FinalizeTempFileBundles", + new FinalizeTempFileBundles(numShardsView, destinationCoder)); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink")) + .include("sink", sink); + if (getSharding() != null) { + builder.include("sharding", getSharding()); + } else { + builder.addIfNotNull( + DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards")); + } + } + + private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() { + return (DynamicDestinations<UserT, DestinationT, OutputT>) + writeOperation.getSink().getDynamicDestinations(); + } + + private class GatherResults<ResultT> + extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> { + private final Coder<ResultT> resultCoder; + + private GatherResults(Coder<ResultT> resultCoder) { + this.resultCoder = resultCoder; } @Override - public int hashCode() { - return Objects.hashCode(window, paneInfo, destination); + public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) { + if (windowedWrites) { + // Reshuffle the results to make them stable against retries. + // Use a single void key to maximize size of bundles for finalization. + return input + .apply("Add void key", WithKeys.<Void, ResultT>of((Void) null)) + .apply("Reshuffle", Reshuffle.<Void, ResultT>of()) + .apply("Drop key", Values.<ResultT>create()) + .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>())) + .setCoder(IterableCoder.of(resultCoder)); + } else { + // Pass results via a side input rather than reshuffle, because we need to get an empty + // iterable to finalize if there are no results. + return input + .getPipeline() + .apply( + Reify.viewInGlobalWindow( + input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder))); + } } } - // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's - // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination - // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so - // this can be used as a key. - private static <DestinationT> int hashDestination( - DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException { - return Hashing.murmur3_32() - .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination)) - .asInt(); + private class WriteUnshardedBundlesToTempFiles + extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> { + private final Coder<DestinationT> destinationCoder; + private final Coder<FileResult<DestinationT>> fileResultCoder; + + private WriteUnshardedBundlesToTempFiles( + Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) { + this.destinationCoder = destinationCoder; + this.fileResultCoder = fileResultCoder; + } + + @Override + public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) { + TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords"); + TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords"); + PCollectionTuple writeTuple = + input.apply( + "WriteUnshardedBundles", + ParDo.of( + new WriteUnshardedTempFilesWithSpillingFn( + spilledRecordsTag, destinationCoder)) + .withSideInputs(sideInputs) + .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag))); + PCollection<FileResult<DestinationT>> writtenBundleFiles = + writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); + // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in + // finalize to stay consistent with what WriteWindowedBundles does. + PCollection<FileResult<DestinationT>> writtenSpilledFiles = + writeTuple + .get(spilledRecordsTag) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) + // Here we group by a synthetic shard number in the range [0, spill factor), + // just for the sake of getting some parallelism within each destination when + // writing the spilled records, whereas the non-spilled records don't have a shard + // number assigned at all. Drop the shard number on the spilled records so that + // shard numbers are assigned together to both the spilled and non-spilled files in + // finalize. + .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create()) + .apply( + "WriteSpilled", + ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs)) + .setCoder(fileResultCoder) + .apply( + "DropShardNum", + ParDo.of( + new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() { + @ProcessElement + public void process(ProcessContext c) { + c.output(c.element().withShard(UNKNOWN_SHARDNUM)); + } + })); + return PCollectionList.of(writtenBundleFiles) + .and(writtenSpilledFiles) + .apply(Flatten.<FileResult<DestinationT>>pCollections()) + .setCoder(fileResultCoder); + } } /** * Writes all the elements in a bundle using a {@link Writer} produced by the {@link * WriteOperation} associated with the {@link FileBasedSink}. */ - private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> { + private class WriteUnshardedTempFilesWithSpillingFn + extends DoFn<UserT, FileResult<DestinationT>> { private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag; private final Coder<DestinationT> destinationCoder; @@ -391,7 +488,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> private int spilledShardNum = UNKNOWN_SHARDNUM; - WriteBundles( + WriteUnshardedTempFilesWithSpillingFn( TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag, Coder<DestinationT> destinationCoder) { this.unwrittenRecordsTag = unwrittenRecordsTag; @@ -406,14 +503,14 @@ public class WriteFiles<UserT, DestinationT, OutputT> @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); + getDynamicDestinations().setSideInputAccessorFromProcessContext(c); PaneInfo paneInfo = c.pane(); // If we are doing windowed writes, we need to ensure that we have separate files for // data in different windows/panes. Similar for dynamic writes, make sure that different // destinations go to different writers. // In the case of unwindowed writes, the window and the pane will always be the same, and // the map will only have a single element. - DestinationT destination = sink.getDynamicDestinations().getDestination(c.element()); + DestinationT destination = getDynamicDestinations().getDestination(c.element()); WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); Writer<DestinationT, OutputT> writer = writers.get(key); if (writer == null) { @@ -444,7 +541,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> return; } } - writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element())); + writeOrClose(writer, getDynamicDestinations().formatRecord(c.element())); } @FinishBundle @@ -468,64 +565,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> window); } } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(WriteFiles.this); - } - } - - /* - * Like {@link WriteBundles}, but where the elements for each shard have been collected into a - * single iterable. - */ - private class WriteShardedBundles - extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); - // Since we key by a 32-bit hash of the destination, there might be multiple destinations - // in this iterable. The number of destinations is generally very small (1000s or less), so - // there will rarely be hash collisions. - Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap(); - for (UserT input : c.element().getValue()) { - DestinationT destination = sink.getDynamicDestinations().getDestination(input); - Writer<DestinationT, OutputT> writer = writers.get(destination); - if (writer == null) { - String uuid = UUID.randomUUID().toString(); - LOG.info( - "Opening writer {} for window {} pane {} destination {}", - uuid, - window, - c.pane(), - destination); - writer = writeOperation.createWriter(); - writer.open(uuid, destination); - writers.put(destination, writer); - } - writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input)); - } - - // Close all writers. - for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { - Writer<DestinationT, OutputT> writer = entry.getValue(); - try { - // Close the writer; if this throws let the error propagate. - writer.close(); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; - } - int shard = c.element().getKey().getShardNumber(); - c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())); - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(WriteFiles.this); - } } private static <DestinationT, OutputT> void writeOrClose( @@ -549,21 +588,90 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> { + private static class WriterKey<DestinationT> { + private final BoundedWindow window; + private final PaneInfo paneInfo; + private final DestinationT destination; + + WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { + this.window = window; + this.paneInfo = paneInfo; + this.destination = destination; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof WriterKey)) { + return false; + } + WriterKey other = (WriterKey) o; + return Objects.equal(window, other.window) + && Objects.equal(paneInfo, other.paneInfo) + && Objects.equal(destination, other.destination); + } + + @Override + public int hashCode() { + return Objects.hashCode(window, paneInfo, destination); + } + } + + // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's + // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination + // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so + // this can be used as a key. + private static <DestinationT> int hashDestination( + DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException { + return Hashing.murmur3_32() + .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination)) + .asInt(); + } + + private class WriteShardedBundlesToTempFiles + extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> { + private final Coder<DestinationT> destinationCoder; + private final Coder<FileResult<DestinationT>> fileResultCoder; + private final PCollectionView<Integer> numShardsView; + + private WriteShardedBundlesToTempFiles( + Coder<DestinationT> destinationCoder, + Coder<FileResult<DestinationT>> fileResultCoder, + PCollectionView<Integer> numShardsView) { + this.destinationCoder = destinationCoder; + this.fileResultCoder = fileResultCoder; + this.numShardsView = numShardsView; + } + + @Override + public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) { + return input + .apply( + "ApplyShardingKey", + ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder)) + .withSideInputs( + numShardsView == null + ? ImmutableList.<PCollectionView<Integer>>of() + : ImmutableList.of(numShardsView))) + .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) + .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()) + .apply( + "WriteShardsIntoTempFiles", + ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs)) + .setCoder(fileResultCoder); + } + } + + private class ApplyShardingKeyFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> { private final @Nullable PCollectionView<Integer> numShardsView; - private final ValueProvider<Integer> numShardsProvider; private final Coder<DestinationT> destinationCoder; private int shardNumber; - ApplyShardingKey( - PCollectionView<Integer> numShardsView, - ValueProvider<Integer> numShardsProvider, - Coder<DestinationT> destinationCoder) { - this.destinationCoder = destinationCoder; + ApplyShardingKeyFn( + @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) { this.numShardsView = numShardsView; - this.numShardsProvider = numShardsProvider; - shardNumber = UNKNOWN_SHARDNUM; + this.destinationCoder = destinationCoder; + this.shardNumber = UNKNOWN_SHARDNUM; } @ProcessElement @@ -595,7 +703,7 @@ public class WriteFiles<UserT, DestinationT, OutputT> // the destinations. This does mean that multiple destinations might end up on the same shard, // however the number of collisions should be small, so there's no need to worry about memory // issues. - DestinationT destination = sink.getDynamicDestinations().getDestination(context.element()); + DestinationT destination = getDynamicDestinations().getDestination(context.element()); context.output( KV.of( ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber), @@ -603,168 +711,86 @@ public class WriteFiles<UserT, DestinationT, OutputT> } } - private static <DestinationT> - Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> - groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) { - Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res = - ArrayListMultimap.create(); - for (FileResult<DestinationT> result : results) { - res.put(KV.of(result.getDestination(), result.getWindow()), result); + private class WriteShardsIntoTempFilesFn + extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + getDynamicDestinations().setSideInputAccessorFromProcessContext(c); + // Since we key by a 32-bit hash of the destination, there might be multiple destinations + // in this iterable. The number of destinations is generally very small (1000s or less), so + // there will rarely be hash collisions. + Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap(); + for (UserT input : c.element().getValue()) { + DestinationT destination = getDynamicDestinations().getDestination(input); + Writer<DestinationT, OutputT> writer = writers.get(destination); + if (writer == null) { + String uuid = UUID.randomUUID().toString(); + LOG.info( + "Opening writer {} for window {} pane {} destination {}", + uuid, + window, + c.pane(), + destination); + writer = writeOperation.createWriter(); + writer.open(uuid, destination); + writers.put(destination, writer); + } + writeOrClose(writer, getDynamicDestinations().formatRecord(input)); + } + + // Close all writers. + for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { + Writer<DestinationT, OutputT> writer = entry.getValue(); + try { + // Close the writer; if this throws let the error propagate. + writer.close(); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + int shard = c.element().getKey().getShardNumber(); + c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())); + } } - return res; } - /** - * A write is performed as sequence of three {@link ParDo}'s. - * - * <p>This singleton collection containing the WriteOperation is then used as a side input to a - * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link - * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and - * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, - * respectively, and {@link Writer#write} method is called for every element in the bundle. The - * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link - * FileBasedSink} for a description of writer results)-one for each bundle. - * - * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer - * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize - * the write. - * - * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called - * before the exception that caused the write to fail is propagated and the write result will be - * discarded. - * - * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and - * deserialized in the bundle-writing and finalization phases, any state change to the - * WriteOperation object that occurs during initialization is visible in the latter phases. - * However, the WriteOperation is not serialized after the bundle-writing phase. This is why - * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate - * WriteOperation). - */ - private WriteFilesResult<DestinationT> createWrite(PCollection<UserT> input) { - Pipeline p = input.getPipeline(); + private class FinalizeTempFileBundles + extends PTransform< + PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> { + @Nullable private final PCollectionView<Integer> numShardsView; + private final Coder<DestinationT> destinationCoder; - if (!windowedWrites) { - // Re-window the data into the global window and remove any existing triggers. - input = - input.apply( - Window.<UserT>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); + private FinalizeTempFileBundles( + @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) { + this.numShardsView = numShardsView; + this.destinationCoder = destinationCoder; } - final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations = - writeOperation.getSink().getDynamicDestinations(); - - // Perform the per-bundle writes as a ParDo on the input PCollection (with the - // WriteOperation as a side input) and collect the results of the writes in a - // PCollection. There is a dependency between this ParDo and the first (the - // WriteOperation PCollection as a side input), so this will happen after the - // initial ParDo. - final PCollectionView<Integer> numShardsView = - (computeNumShards == null) ? null : input.apply(computeNumShards); - List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null - ? ImmutableList.<PCollectionView<Integer>>of() - : ImmutableList.of(numShardsView); + @Override + public WriteFilesResult<DestinationT> expand( + PCollection<Iterable<FileResult<DestinationT>>> input) { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> shardedWindowCoder = - (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); - final Coder<DestinationT> destinationCoder; - try { - destinationCoder = - destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); - destinationCoder.verifyDeterministic(); - } catch (CannotProvideCoderException | NonDeterministicException e) { - throw new RuntimeException(e); - } - final FileResultCoder<DestinationT> fileResultCoder = - FileResultCoder.of(shardedWindowCoder, destinationCoder); - - PCollection<FileResult<DestinationT>> results; - if (computeNumShards == null && numShardsProvider == null) { - TupleTag<FileResult<DestinationT>> writtenRecordsTag = - new TupleTag<>("writtenRecordsTag"); - TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = - new TupleTag<>("spilledRecordsTag"); - String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles"; - PCollectionTuple writeTuple = - input.apply( - writeName, - ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder)) - .withSideInputs(sideInputs) - .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag))); - PCollection<FileResult<DestinationT>> writtenBundleFiles = - writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder); - // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in - // finalize to stay consistent with what WriteWindowedBundles does. - PCollection<FileResult<DestinationT>> writtenSpilledFiles = - writeTuple - .get(spilledRecordsTag) - .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - // Here we group by a synthetic shard number in the range [0, spill factor), - // just for the sake of getting some parallelism within each destination when - // writing the spilled records, whereas the non-spilled records don't have a shard - // number assigned at all. Drop the shard number on the spilled records so that - // shard numbers are assigned together to both the spilled and non-spilled files in - // finalize. - .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create()) - .apply( - "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs)) - .setCoder(fileResultCoder) - .apply("DropShardNum", ParDo.of( - new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() { - @ProcessElement - public void process(ProcessContext c) { - c.output(c.element().withShard(UNKNOWN_SHARDNUM)); - } - })); - results = - PCollectionList.of(writtenBundleFiles) - .and(writtenSpilledFiles) - .apply(Flatten.<FileResult<DestinationT>>pCollections()); - } else { - results = + List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs); + if (numShardsView != null) { + finalizeSideInputs.add(numShardsView); + } + PCollection<KV<DestinationT, String>> outputFilenames = input - .apply( - "ApplyShardLabel", - ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder)) - .withSideInputs(shardingSideInputs)) - .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) - .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()) - .apply( - "WriteShardedBundles", - ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs)); - } - results.setCoder(fileResultCoder); + .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(finalizeSideInputs)) + .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); - PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles; - if (windowedWrites) { - // Reshuffle the results to make them stable against retries. - // Use a single void key to maximize size of bundles for finalization. - PCollection<FileResult<DestinationT>> stableResults = results - .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null)) - .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of()) - .apply("Drop key", Values.<FileResult<DestinationT>>create()); - fileResultBundles = - stableResults - .apply( - "Gather bundles", - ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>())) - .setCoder(IterableCoder.of(fileResultCoder)); - } else { - // Pass results via a side input rather than reshuffle, because we need to get an empty - // iterable to finalize if there are no results. - fileResultBundles = - p.apply( - Reify.viewInGlobalWindow( - results.apply(View.<FileResult<DestinationT>>asIterable()), - IterableCoder.of(fileResultCoder))); + TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = + new TupleTag<>("perDestinationOutputFilenames"); + return WriteFilesResult.in( + input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames); } - class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> { + private class FinalizeFn + extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> { @ProcessElement public void process(ProcessContext c) throws Exception { - writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); + getDynamicDestinations().setSideInputAccessorFromProcessContext(c); @Nullable Integer fixedNumShards; if (numShardsView != null) { fixedNumShards = c.sideInput(numShardsView); @@ -776,11 +802,11 @@ public class WriteFiles<UserT, DestinationT, OutputT> } List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element()); LOG.info("Finalizing {} file results", fileResults.size()); - DestinationT defaultDest = destinations.getDefaultDestination(); + DestinationT defaultDest = getDynamicDestinations().getDefaultDestination(); List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = fileResults.isEmpty() ? writeOperation.finalizeDestination( - defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults) + defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults) : finalizeAllDestinations(fileResults, fixedNumShards); for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { FileResult<DestinationT> res = entry.getKey(); @@ -789,18 +815,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> writeOperation.moveToOutputFiles(resultsToFinalFilenames); } } - - List<PCollectionView<?>> sideInputs = - FluentIterable.concat(this.sideInputs, shardingSideInputs).toList(); - PCollection<KV<DestinationT, String>> outputFilenames = - fileResultBundles - .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs)) - .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); - - TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = - new TupleTag<>("perDestinationOutputFilenames"); - return WriteFilesResult.in( - input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames); } private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations( -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
