Expose base output file name on FileBasedSink ----Release Notes---- Add the ability to get the base output filename to FileBasedSinks.
[] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115265994 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c857afaf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c857afaf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c857afaf Branch: refs/heads/master Commit: c857afaf1911175a6d907f53167e39e9b639c665 Parents: 209364e Author: lcwik <[email protected]> Authored: Mon Feb 22 13:49:38 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:25 2016 -0800 ---------------------------------------------------------------------- .../com/google/cloud/dataflow/sdk/io/FileBasedSink.java | 11 +++++++++++ 1 file changed, 11 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c857afaf/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java index f14f4bf..7c30167 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java @@ -121,6 +121,13 @@ public abstract class FileBasedSink<T> extends Sink<T> { } /** + * Returns the base output filename for this file based sink. + */ + public String getBaseOutputFilename() { + return baseOutputFilename; + } + + /** * Perform pipeline-construction-time validation. The default implementation is a no-op. * Subclasses should override to ensure the sink is valid and can be written to. It is recommended * to use {@link Preconditions} in the implementation of this method. @@ -806,6 +813,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { } static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> { + @Override public PCollection<T> apply(PCollection<T> input) { return input // TODO: This would need to be adapted to write per-window shards. @@ -815,10 +823,12 @@ public abstract class FileBasedSink<T> extends Sink<T> { .apply("RandomKey", ParDo.of( new DoFn<T, KV<Long, T>>() { transient long counter, step; + @Override public void startBundle(Context c) { counter = (long) (Math.random() * Long.MAX_VALUE); step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE); } + @Override public void processElement(ProcessContext c) { counter += step; c.output(KV.of(counter, c.element())); @@ -827,6 +837,7 @@ public abstract class FileBasedSink<T> extends Sink<T> { .apply(GroupByKey.<Long, T>create()) .apply("Ungroup", ParDo.of( new DoFn<KV<Long, Iterable<T>>, T>() { + @Override public void processElement(ProcessContext c) { for (T item : c.element().getValue()) { c.output(item);
