Specifies numShards on windowed writes examples, as it is now required
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c5f8be9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c5f8be9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c5f8be9 Branch: refs/heads/tez-runner Commit: 9c5f8be92f0d99ba356e3a6f2b822f9d9a1659cf Parents: c3a96bf Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Nov 15 10:48:52 2017 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Nov 15 16:02:09 2017 -0800 ---------------------------------------------------------------------- .../org/apache/beam/examples/WindowedWordCount.java | 7 ++++--- .../beam/examples/common/WriteOneFilePerWindow.java | 12 ++++-------- .../org/apache/beam/examples/WindowedWordCountIT.java | 8 -------- 3 files changed, 8 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 5c039cd..21cfed8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -161,9 +161,10 @@ public class WindowedWordCount { Long getMaxTimestampMillis(); void setMaxTimestampMillis(Long value); - @Description("Fixed number of shards to produce per window, or null for runner-chosen sharding") - Integer getNumShards(); - void setNumShards(Integer numShards); + @Description("Fixed number of shards to produce per window") + @Default.Integer(3) + int getNumShards(); + void setNumShards(int numShards); } public static void main(String[] args) throws IOException { http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java index abd14b7..a5c84f6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java @@ -19,7 +19,6 @@ package org.apache.beam.examples.common; import static com.google.common.base.MoreObjects.firstNonNull; -import javax.annotation.Nullable; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; @@ -46,10 +45,9 @@ import org.joda.time.format.ISODateTimeFormat; public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> { private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute(); private String filenamePrefix; - @Nullable - private Integer numShards; + private int numShards; - public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) { + public WriteOneFilePerWindow(String filenamePrefix, int numShards) { this.filenamePrefix = filenamePrefix; this.numShards = numShards; } @@ -61,10 +59,8 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone TextIO.write() .to(new PerWindowFiles(resource)) .withTempDirectory(resource.getCurrentDirectory()) - .withWindowedWrites(); - if (numShards != null) { - write = write.withNumShards(numShards); - } + .withWindowedWrites() + .withNumShards(numShards); return input.apply(write); } http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index bec7952..279de53 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -87,14 +87,6 @@ public class WindowedWordCountIT { } @Test - public void testWindowedWordCountInBatchDynamicSharding() throws Exception { - WindowedWordCountITOptions options = batchOptions(); - // This is the default value, but make it explicit - options.setNumShards(null); - testWindowedWordCountPipeline(options); - } - - @Test public void testWindowedWordCountInBatchStaticSharding() throws Exception { WindowedWordCountITOptions options = batchOptions(); options.setNumShards(3);