This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit c6154382263a68d7eca893c7da3617d177e4c1df Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Wed Nov 15 20:19:09 2017 -0800 Unifies windowed and unwindowed finalize. --- .../java/org/apache/beam/sdk/io/WriteFiles.java | 232 ++++++++------------- .../java/org/apache/beam/sdk/transforms/Reify.java | 73 ++++++- .../apache/beam/sdk/values/TypeDescriptors.java | 4 + 3 files changed, 163 insertions(+), 146 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 9cfabfe..87459e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -42,11 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.FileBasedSink.FileResult; import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; @@ -55,14 +55,13 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; @@ -653,6 +652,9 @@ public class WriteFiles<UserT, DestinationT, OutputT> .discardingFiredPanes()); } + final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations = + writeOperation.getSink().getDynamicDestinations(); + // Perform the per-bundle writes as a ParDo on the input PCollection (with the // WriteOperation as a side input) and collect the results of the writes in a // PCollection. There is a dependency between this ParDo and the first (the @@ -663,19 +665,6 @@ public class WriteFiles<UserT, DestinationT, OutputT> List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null ? ImmutableList.<PCollectionView<Integer>>of() : ImmutableList.of(numShardsView); - SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards = - new SerializableFunction<DoFn.ProcessContext, Integer>() { - @Override - public Integer apply(DoFn<?, ?>.ProcessContext c) { - if (numShardsView != null) { - return c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - return numShardsProvider.get(); - } else { - return null; - } - } - }; @SuppressWarnings("unchecked") Coder<BoundedWindow> shardedWindowCoder = @@ -683,13 +672,12 @@ public class WriteFiles<UserT, DestinationT, OutputT> final Coder<DestinationT> destinationCoder; try { destinationCoder = - sink.getDynamicDestinations() - .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); + destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); destinationCoder.verifyDeterministic(); } catch (CannotProvideCoderException | NonDeterministicException e) { throw new RuntimeException(e); } - FileResultCoder<DestinationT> fileResultCoder = + final FileResultCoder<DestinationT> fileResultCoder = FileResultCoder.of(shardedWindowCoder, destinationCoder); PCollection<FileResult<DestinationT>> results; @@ -749,155 +737,109 @@ public class WriteFiles<UserT, DestinationT, OutputT> } results.setCoder(fileResultCoder); - PCollection<KV<DestinationT, String>> outputFilenames; + PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles; if (windowedWrites) { - // We need to materialize the FileResult's before the renaming stage: this can be done either - // via a side input or via a GBK. However, when processing streaming windowed writes, results - // will arrive multiple times. This means we can't share the below implementation that turns - // the results into a side input, as new data arriving into a side input does not trigger the - // listening DoFn. We also can't use a GBK because we need only the materialization, but not - // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK - // would give. So, we use a reshuffle (over a single key to maximize bundling). - outputFilenames = - results - .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder())) - .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of()) - .apply(Values.<FileResult<DestinationT>>create()) + // Reshuffle the results to make them stable against retries. + // Use a single void key to maximize size of bundles for finalization. + PCollection<FileResult<DestinationT>> stableResults = results + .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null)) + .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of()) + .apply("Drop key", Values.<FileResult<DestinationT>>create()); + fileResultBundles = + stableResults .apply( - "FinalizeWindowed", - ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation)) - .withSideInputs(shardingSideInputs)) - .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); + "Gather bundles", + ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>())) + .setCoder(IterableCoder.of(fileResultCoder)); } else { - PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = - results.apply(View.<FileResult<DestinationT>>asIterable()); - - // Finalize the write in another do-once ParDo on the singleton collection containing the - // Writer. The results from the per-bundle writes are given as an Iterable side input. - // The WriteOperation's state is the same as after its initialization in the first - // do-once ParDo. There is a dependency between this ParDo and the parallel write (the writer - // results collection as a side input), so it will happen after the parallel write. - // For the non-windowed case, we guarantee that if no data is written but the user has - // set numShards, then all shards will be written out as empty files. For this reason we - // use a side input here. - outputFilenames = - p.apply(Create.of((Void) null)) - .apply( - "FinalizeUnwindowed", - ParDo.of( - new FinalizeUnwindowedFn<>( - getFixedNumShards, resultsView, writeOperation)) - .withSideInputs( - FluentIterable.concat(sideInputs, shardingSideInputs) - .append(resultsView) - .toList())) - .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); + // Pass results via a side input rather than reshuffle, because we need to get an empty + // iterable to finalize if there are no results. + fileResultBundles = + p.apply( + Reify.viewInGlobalWindow( + results.apply(View.<FileResult<DestinationT>>asIterable()), + IterableCoder.of(fileResultCoder))); } + class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> { + @ProcessElement + public void process(ProcessContext c) throws Exception { + writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); + @Nullable Integer fixedNumShards; + if (numShardsView != null) { + fixedNumShards = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + fixedNumShards = numShardsProvider.get(); + } else { + checkState(!windowedWrites, "Windowed write should have set fixed sharding"); + fixedNumShards = null; + } + List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element()); + LOG.info("Finalizing {} file results", fileResults.size()); + DestinationT defaultDest = destinations.getDefaultDestination(); + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = + fileResults.isEmpty() + ? writeOperation.finalizeDestination( + defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults) + : finalizeAllDestinations(fileResults, fixedNumShards); + for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { + FileResult<DestinationT> res = entry.getKey(); + c.output(KV.of(res.getDestination(), entry.getValue().toString())); + } + writeOperation.moveToOutputFiles(resultsToFinalFilenames); + } + } + + List<PCollectionView<?>> sideInputs = + FluentIterable.concat(this.sideInputs, shardingSideInputs).toList(); + PCollection<KV<DestinationT, String>> outputFilenames = + fileResultBundles + .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs)) + .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); + TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = new TupleTag<>("perDestinationOutputFilenames"); return WriteFilesResult.in( input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames); } - private static class FinalizeWindowedFn<DestinationT> - extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> { - private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards; - private final WriteOperation<DestinationT, ?> writeOperation; - - @Nullable private transient List<FileResult<DestinationT>> fileResults; - @Nullable private Integer fixedNumShards; - - public FinalizeWindowedFn( - SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards, - WriteOperation<DestinationT, ?> writeOperation) { - this.getFixedNumShards = getFixedNumShards; - this.writeOperation = writeOperation; + private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations( + List<FileResult<DestinationT>> fileResults, @Nullable Integer fixedNumShards) + throws Exception { + Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res = + ArrayListMultimap.create(); + for (FileResult<DestinationT> result : fileResults) { + res.put(KV.of(result.getDestination(), result.getWindow()), result); + } + List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); + for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> + destEntry : res.asMap().entrySet()) { + KV<DestinationT, BoundedWindow> destWindow = destEntry.getKey(); + resultsToFinalFilenames.addAll( + writeOperation.finalizeDestination( + destWindow.getKey(), destWindow.getValue(), fixedNumShards, destEntry.getValue())); } + return resultsToFinalFilenames; + } + + private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> { + @Nullable private transient Multimap<BoundedWindow, T> bundles = null; @StartBundle public void startBundle() { - fileResults = Lists.newArrayList(); - fixedNumShards = null; + bundles = ArrayListMultimap.create(); } @ProcessElement - public void processElement(ProcessContext c) { - fileResults.add(c.element()); - if (fixedNumShards == null) { - fixedNumShards = getFixedNumShards.apply(c); - checkState(fixedNumShards != null, "Windowed write should have set fixed sharding"); - } + public void process(ProcessContext c, BoundedWindow w) { + bundles.put(w, c.element()); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = - finalizeAllDestinations(writeOperation, fileResults, fixedNumShards); - for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { - FileResult<DestinationT> res = entry.getKey(); - c.output( - KV.of(res.getDestination(), entry.getValue().toString()), - res.getWindow().maxTimestamp(), - res.getWindow()); + for (BoundedWindow w : bundles.keySet()) { + c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w); } - writeOperation.moveToOutputFiles(resultsToFinalFilenames); } } - - private static class FinalizeUnwindowedFn<DestinationT> - extends DoFn<Void, KV<DestinationT, String>> { - private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards; - private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView; - private final WriteOperation<DestinationT, ?> writeOperation; - - public FinalizeUnwindowedFn( - SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards, - PCollectionView<Iterable<FileResult<DestinationT>>> resultsView, - WriteOperation<DestinationT, ?> writeOperation) { - this.getFixedNumShards = getFixedNumShards; - this.resultsView = resultsView; - this.writeOperation = writeOperation; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); - List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView)); - List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = - fileResults.isEmpty() - ? writeOperation.finalizeDestination( - writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), - GlobalWindow.INSTANCE, - getFixedNumShards.apply(c), - ImmutableList.<FileResult<DestinationT>>of()) - : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c)); - for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { - c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString())); - } - writeOperation.moveToOutputFiles(resultsToFinalFilenames); - } - } - - private static <DestinationT> - List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations( - WriteOperation<DestinationT, ?> writeOperation, - List<FileResult<DestinationT>> fileResults, - Integer fixedNumShards) - throws Exception { - List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); - Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap = - groupByDestinationAndWindow(fileResults); - for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> - destEntry : resultsByDestMultimap.asMap().entrySet()) { - resultsToFinalFilenames.addAll( - writeOperation.finalizeDestination( - destEntry.getKey().getKey(), - destEntry.getKey().getValue(), - fixedNumShards, - destEntry.getValue())); - } - return resultsToFinalFilenames; - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index caa89e6..7f5c881 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -18,17 +18,69 @@ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.joda.time.Duration; -/** {@link PTransform PTransforms} for reifying the timestamp, window and pane of values. */ +/** + * {@link PTransform PTransforms} for converting between explicit and implicit form of various Beam + * values. + */ public class Reify { + private static class ReifyView<K, V> + extends PTransform<PCollection<K>, PCollection<KV<K, V>>> { + private final PCollectionView<V> view; + private final Coder<V> coder; + + private ReifyView(PCollectionView<V> view, Coder<V> coder) { + this.view = view; + this.coder = coder; + } + + @Override + public PCollection<KV<K, V>> expand(PCollection<K> input) { + return input + .apply( + ParDo.of( + new DoFn<K, KV<K, V>>() { + @ProcessElement + public void process(ProcessContext c) { + c.output(KV.of(c.element(), c.sideInput(view))); + } + }) + .withSideInputs(view)) + .setCoder(KvCoder.of(input.getCoder(), coder)); + } + } + + private static class ReifyViewInGlobalWindow<V> + extends PTransform<PBegin, PCollection<V>> { + private final PCollectionView<V> view; + private final Coder<V> coder; + + private ReifyViewInGlobalWindow(PCollectionView<V> view, Coder<V> coder) { + this.view = view; + this.coder = coder; + } + + @Override + public PCollection<V> expand(PBegin input) { + return input + .apply(Create.of((Void) null).withCoder(VoidCoder.of())) + .apply(Reify.<Void, V>viewAsValues(view, coder)) + .apply(Values.<V>create()); + } + } + /** Private implementation of {@link #windows()}. */ private static class Window<T> extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> { @@ -184,9 +236,28 @@ public class Reify { return new WindowInValue<>(); } + /** Extracts the timestamps from each value in a {@link KV}. */ public static <K, V> PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> extractTimestampsFromValues() { return new ExtractTimestampsFromValues<>(); } + + /** + * Pairs each element in a collection with the value of a side input associated with the element's + * window. + */ + public static <K, V> PTransform<PCollection<K>, PCollection<KV<K, V>>> viewAsValues( + PCollectionView<V> view, Coder<V> coder) { + return new ReifyView<>(view, coder); + } + + /** + * Returns a {@link PCollection} consisting of a single element, containing the value of the given + * view in the global window. + */ + public static <K, V> PTransform<PBegin, PCollection<V>> viewInGlobalWindow( + PCollectionView<V> view, Coder<V> coder) { + return new ReifyViewInGlobalWindow<>(view, coder); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java index e59f84b..8ef2a4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java @@ -291,6 +291,10 @@ public class TypeDescriptors { return typeDescriptor; } + public static TypeDescriptor<Void> voids() { + return new TypeDescriptor<Void>() {}; + } + /** * A helper interface for use with {@link #extractFromTypeParameters(Object, Class, * TypeVariableExtractor)}. -- To stop receiving notification emails like this one, please contact "commits@beam.apache.org" <commits@beam.apache.org>.