Repository: incubator-beam Updated Branches: refs/heads/master a3fc40aa3 -> 9f97ea0a7
Use Create.of withCoder instead of setCoder on the created PCollection One more left.. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69da98a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69da98a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69da98a9 Branch: refs/heads/master Commit: 69da98a93956add8e86d094cb866bf86c5626089 Parents: 78c8c52 Author: Ilya Ganelin <[email protected]> Authored: Tue May 24 13:37:04 2016 -0700 Committer: Sela <[email protected]> Committed: Wed May 25 23:59:15 2016 +0300 ---------------------------------------------------------------------- .../test/java/org/apache/beam/runners/spark/DeDupTest.java | 2 +- .../java/org/apache/beam/runners/spark/EmptyInputTest.java | 2 +- .../org/apache/beam/runners/spark/SimpleWordCountTest.java | 8 ++++---- .../java/org/apache/beam/runners/spark/io/NumShardsTest.java | 2 +- .../beam/runners/spark/translation/CombineGloballyTest.java | 2 +- .../beam/runners/spark/translation/CombinePerKeyTest.java | 2 +- .../runners/spark/translation/WindowedWordCountTest.java | 8 ++++---- 7 files changed, 13 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index 0b48bed..285a2d6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -51,7 +51,7 @@ public class DeDupTest { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()); + PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of())); PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); PAssert.that(output).containsInAnyOrder(EXPECTED_SET); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index 7b25e34..f227e94 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -46,7 +46,7 @@ public class EmptyInputTest { options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); List<String> empty = Collections.emptyList(); - PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords())); EvaluationResult res = SparkPipelineRunner.create().run(p); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index eee120e..61ad24f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -66,8 +66,8 @@ public class SimpleWordCountTest { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder - .of()); + PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); PCollection<String> output = inputWords.apply(new CountWords()); PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); @@ -84,8 +84,8 @@ public class SimpleWordCountTest { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder - .of()); + PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder + .of())); PCollection<String> output = inputWords.apply(new CountWords()); File outputFile = testFolder.newFile(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 8ce35c4..23d4592 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -75,7 +75,7 @@ public class NumShardsTest { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index ac64540..9a3edd3 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -52,7 +52,7 @@ public class CombineGloballyTest { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkPipelineRunner.class); Pipeline p = Pipeline.create(options); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); EvaluationResult res = SparkPipelineRunner.create().run(p); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index 4e6c888..face526 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -52,7 +52,7 @@ public class CombinePerKeyTest { @Test public void testRun() { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); EvaluationResult res = SparkPipelineRunner.create().run(p); Map<String, Long> actualCnts = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69da98a9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 8062658..c6911e1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -75,8 +75,8 @@ public class WindowedWordCountTest { @Test public void testFixed2() throws Exception { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) - .setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS) + .withCoder(StringUtf8Coder.of())); PCollection<String> windowedWords = inputWords .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))); @@ -95,8 +95,8 @@ public class WindowedWordCountTest { @Test public void testSliding() throws Exception { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); - PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) - .setCoder(StringUtf8Coder.of()); + PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS) + .withCoder(StringUtf8Coder.of())); PCollection<String> windowedWords = inputWords .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2)) .every(Duration.standardMinutes(1))));
