http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java new file mode 100644 index 0000000..44d8e0f --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SideEffectsTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.Serializable; +import java.net.URI; + +import static org.junit.Assert.*; + +public class SideEffectsTest implements Serializable { + + static class UserException extends RuntimeException { + } + + @Test + public void test() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline pipeline = Pipeline.create(options); + + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + throw new UserException(); + } + })); + + try { + pipeline.run(); + fail("Run should thrown an exception"); + } catch (RuntimeException e) { + assertNotNull(e.getCause()); + + // TODO: remove the version check (and the setup and teardown methods) when we no + // longer support Spark 1.3 or 1.4 + String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version(); + if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) { + assertTrue(e.getCause() instanceof UserException); + } + } + } + + @Before + public void setup() { + System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "true"); + } + + @After + public void teardown() { + System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, "false"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java new file mode 100644 index 0000000..f930855 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +public class SimpleWordCountTest { + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + private static final Set<String> EXPECTED_COUNT_SET = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + + @Test + public void testRun() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(options); + PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder + .of()); + PCollection<String> output = inputWords.apply(new CountWords()); + + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + } + + /** + * A DoFn that tokenizes lines of text into individual words. + */ + static class ExtractWordsFn extends DoFn<String, String> { + private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = WORD_BOUNDARY.split(c.element()); + + // Keep track of the number of lines without any words encountered while tokenizing. + // This aggregator is visible in the monitoring UI when run using DataflowPipelineRunner. + if (words.length == 0) { + emptyLines.addValue(1L); + } + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** + * A DoFn that converts a Word and Count into a printable string. + */ + private static class FormatCountsFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + ": " + c.element().getValue()); + } + } + + public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { + @Override + public PCollection<String> apply(PCollection<String> lines) { + + // Convert lines of text into individual words. + PCollection<String> words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + // Format each word and count into a printable string. + + return wordCounts.apply(ParDo.of(new FormatCountsFn())); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java new file mode 100644 index 0000000..3fc3ecc --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import org.junit.Assert; +import org.junit.Test; + +public class TestSparkPipelineOptionsFactory { + @Test + public void testDefaultCreateMethod() { + SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + Assert.assertEquals("local[1]", actualOptions.getSparkMaster()); + } + + @Test + public void testSettingCustomOptions() { + SparkPipelineOptions actualOptions = SparkPipelineOptionsFactory.create(); + actualOptions.setSparkMaster("spark://207.184.161.138:7077"); + Assert.assertEquals("spark://207.184.161.138:7077", actualOptions.getSparkMaster()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java new file mode 100644 index 0000000..0a36c9e --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import com.google.cloud.dataflow.examples.complete.TfIdf; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Keys; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.junit.Test; + +import java.net.URI; +import java.util.Arrays; + +/** + * A test based on {@code TfIdf} from the SDK. + */ +public class TfIdfTest { + + @Test + public void testTfIdf() throws Exception { + Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); + + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline + .apply(Create.of( + KV.of(new URI("x"), "a b c d"), + KV.of(new URI("y"), "a b c"), + KV.of(new URI("z"), "a m n"))) + .apply(new TfIdf.ComputeTfIdf()); + + PCollection<String> words = wordToUriAndTfIdf + .apply(Keys.<String>create()) + .apply(RemoveDuplicates.<String>create()); + + DataflowAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); + + EvaluationResult res = SparkPipelineRunner.create().run(pipeline); + res.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java new file mode 100644 index 0000000..f759fe9 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TransformTranslatorTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import com.google.api.client.repackaged.com.google.common.base.Joiner; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Charsets; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +/** + * A test for the transforms registered in TransformTranslator. + * Builds a regular Dataflow pipeline with each of the mapped + * transforms, and makes sure that they work when the pipeline is + * executed in Spark. + */ +public class TransformTranslatorTest { + + @Rule + public TestName name = new TestName(); + + private DirectPipelineRunner directRunner; + private SparkPipelineRunner sparkRunner; + private String testDataDirName; + + @Before public void init() throws IOException { + sparkRunner = SparkPipelineRunner.create(); + directRunner = DirectPipelineRunner.createForTest(); + testDataDirName = Joiner.on(File.separator).join("target", "test-data", name.getMethodName()) + + File.separator; + FileUtils.deleteDirectory(new File(testDataDirName)); + new File(testDataDirName).mkdirs(); + } + + /** + * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline + * in DirectPipelineRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark + * transforms. Finally it makes sure that the results are the same for both runs. + */ + @Test + public void testTextIOReadAndWriteTransforms() throws IOException { + String directOut = runPipeline("direct", directRunner); + String sparkOut = runPipeline("spark", sparkRunner); + + List<String> directOutput = + Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8); + + List<String> sparkOutput = + Files.readAllLines(Paths.get(sparkOut + "-00000-of-00001"), Charsets.UTF_8); + + // sort output to get a stable result (PCollections are not ordered) + Collections.sort(directOutput); + Collections.sort(sparkOutput); + + Assert.assertArrayEquals(directOutput.toArray(), sparkOutput.toArray()); + } + + private String runPipeline(String name, PipelineRunner<?> runner) { + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + String outFile = Joiner.on(File.separator).join(testDataDirName, "test_text_out_" + name); + PCollection<String> lines = p.apply(TextIO.Read.from("src/test/resources/test_text.txt")); + lines.apply(TextIO.Write.to(outFile)); + runner.run(p); + return outFile; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java new file mode 100644 index 0000000..eb88542 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/WindowedWordCountTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Duration; +import org.junit.Test; + +public class WindowedWordCountTest { + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final Long[] TIMESTAMPS_ARRAY = { + 60000L, 60000L, 60000L, + 120000L, 120000L, 120000L}; + private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY); + private static final List<String> EXPECTED_COUNT_SET = + ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", + "hi: 2", "sue: 1", "bob: 1"); + + @Test + public void testRun() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) + .setCoder(StringUtf8Coder.of()); + PCollection<String> windowedWords = inputWords + .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); + + PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java new file mode 100644 index 0000000..ad7256c --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.google.cloud.dataflow.sdk.testing.CoderProperties; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Test; + +/** + * Tests for WritableCoder. + */ +public class WritableCoderTest { + + @Test + public void testIntWritableEncoding() throws Exception { + IntWritable value = new IntWritable(42); + WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class); + + CoderProperties.coderDecodeEncodeEqual(coder, value); + } + + @Test + public void testNullWritableEncoding() throws Exception { + WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class); + + CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java new file mode 100644 index 0000000..73dd2d3 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.Lists; +import com.google.common.io.Resources; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; + +public class AvroPipelineTest { + + private File inputFile; + private File outputDir; + + @Rule + public final TemporaryFolder tmpDir = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + inputFile = tmpDir.newFile("test.avro"); + outputDir = tmpDir.newFolder("out"); + outputDir.delete(); + } + + @Test + public void testGeneric() throws Exception { + Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream()); + GenericRecord savedRecord = new GenericData.Record(schema); + savedRecord.put("name", "John Doe"); + savedRecord.put("age", 42); + savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane")); + populateGenericFile(Lists.newArrayList(savedRecord), schema); + + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PCollection<GenericRecord> input = p.apply( + AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); + input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + + List<GenericRecord> records = readGenericFile(); + assertEquals(Lists.newArrayList(savedRecord), records); + } + + private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException { + FileOutputStream outputStream = new FileOutputStream(this.inputFile); + GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema); + + try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter)) { + dataFileWriter.create(schema, outputStream); + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + } + outputStream.close(); + } + + private List<GenericRecord> readGenericFile() throws IOException { + List<GenericRecord> records = Lists.newArrayList(); + GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(); + try (DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) { + for (GenericRecord record : dataFileReader) { + records.add(record); + } + } + return records; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java new file mode 100644 index 0000000..39525b2 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io; + +import com.google.cloud.dataflow.examples.WordCount; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.SparkPipelineOptionsFactory; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class NumShardsTest { + + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + + private File outputDir; + + @Rule + public final TemporaryFolder tmpDir = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + outputDir = tmpDir.newFolder("out"); + outputDir.delete(); + } + + @Test + public void testText() throws Exception { + SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + options.setRunner(SparkPipelineRunner.class); + Pipeline p = Pipeline.create(options); + PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + PCollection<String> output = inputWords.apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())); + output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + + int count = 0; + Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", "bob: 2"); + for (File f : tmpDir.getRoot().listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().matches("out-.*\\.txt"); + } + })) { + count++; + for (String line : Files.readLines(f, Charsets.UTF_8)) { + assertTrue(line + " not found", expected.remove(line)); + } + } + assertEquals(3, count); + assertTrue(expected.isEmpty()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java new file mode 100644 index 0000000..7a9be8b --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class HadoopFileFormatPipelineTest { + + private File inputFile; + private File outputFile; + + @Rule + public final TemporaryFolder tmpDir = new TemporaryFolder(); + + @Before + public void setUp() throws IOException { + inputFile = tmpDir.newFile("test.seq"); + outputFile = tmpDir.newFolder("out"); + outputFile.delete(); + } + + @Test + public void testSequenceFile() throws Exception { + populateFile(); + + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + @SuppressWarnings("unchecked") + Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass = + (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; + HadoopIO.Read.Bound<IntWritable,Text> read = + HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); + PCollection<KV<IntWritable, Text>> input = p.apply(read); + @SuppressWarnings("unchecked") + Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = + (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class; + @SuppressWarnings("unchecked") + HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), + outputFormatClass, IntWritable.class, Text.class); + input.apply(write.withoutSharding()); + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + + IntWritable key = new IntWritable(); + Text value = new Text(); + try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) { + int i = 0; + while (reader.next(key, value)) { + assertEquals(i, key.get()); + assertEquals("value-" + i, value.toString()); + i++; + } + } + } + + private void populateFile() throws IOException { + IntWritable key = new IntWritable(); + Text value = new Text(); + try (Writer writer = SequenceFile.createWriter( + new Configuration(), + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(this.inputFile.toURI())))) { + for (int i = 0; i < 5; i++) { + key.set(i); + value.set("value-" + i); + writer.append(key, value); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java new file mode 100644 index 0000000..b1d35d5 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ + +package org.apache.beam.runners.spark.io.hadoop; + +import org.junit.Test; + +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber; +import static org.junit.Assert.assertEquals; + +public class ShardNameBuilderTest { + + @Test + public void testReplaceShardCount() { + assertEquals("", replaceShardCount("", 6)); + assertEquals("-S-of-6", replaceShardCount("-S-of-N", 6)); + assertEquals("-SS-of-06", replaceShardCount("-SS-of-NN", 6)); + assertEquals("-S-of-60", replaceShardCount("-S-of-N", 60)); + assertEquals("-SS-of-60", replaceShardCount("-SS-of-NN", 60)); + assertEquals("/part-SSSSS", replaceShardCount("/part-SSSSS", 6)); + } + + @Test + public void testReplaceShardNumber() { + assertEquals("", replaceShardNumber("", 5)); + assertEquals("-5-of-6", replaceShardNumber("-S-of-6", 5)); + assertEquals("-05-of-06", replaceShardNumber("-SS-of-06", 5)); + assertEquals("-59-of-60", replaceShardNumber("-S-of-60", 59)); + assertEquals("-59-of-60", replaceShardNumber("-SS-of-60", 59)); + assertEquals("/part-00005", replaceShardNumber("/part-SSSSS", 5)); + } + + @Test + public void testGetOutputDirectory() { + assertEquals("./", getOutputDirectory("foo", "-S-of-N")); + assertEquals("foo", getOutputDirectory("foo/bar", "-S-of-N")); + assertEquals("/foo", getOutputDirectory("/foo/bar", "-S-of-N")); + assertEquals("hdfs://foo/", getOutputDirectory("hdfs://foo/bar", "-S-of-N")); + assertEquals("foo/bar", getOutputDirectory("foo/bar", "/part-SSSSS")); + assertEquals("/foo/bar", getOutputDirectory("/foo/bar", "/part-SSSSS")); + assertEquals("hdfs://foo/bar", getOutputDirectory("hdfs://foo/bar", "/part-SSSSS")); + } + + @Test + public void testGetOutputFilePrefix() { + assertEquals("foo", getOutputFilePrefix("foo", "-S-of-N")); + assertEquals("bar", getOutputFilePrefix("foo/bar", "-S-of-N")); + assertEquals("bar", getOutputFilePrefix("/foo/bar", "-S-of-N")); + assertEquals("bar", getOutputFilePrefix("hdfs://foo/bar", "-S-of-N")); + assertEquals("", getOutputFilePrefix("foo/bar", "/part-SSSSS")); + assertEquals("", getOutputFilePrefix("/foo/bar", "/part-SSSSS")); + assertEquals("", getOutputFilePrefix("hdfs://foo/bar", "/part-SSSSS")); + } + + @Test + public void testGetOutputFileTemplate() { + assertEquals("-S-of-N", getOutputFileTemplate("foo", "-S-of-N")); + assertEquals("-S-of-N", getOutputFileTemplate("foo/bar", "-S-of-N")); + assertEquals("-S-of-N", getOutputFileTemplate("/foo/bar", "-S-of-N")); + assertEquals("-S-of-N", getOutputFileTemplate("hdfs://foo/bar", "-S-of-N")); + assertEquals("part-SSSSS", getOutputFileTemplate("foo/bar", "/part-SSSSS")); + assertEquals("part-SSSSS", getOutputFileTemplate("/foo/bar", "/part-SSSSS")); + assertEquals("part-SSSSS", getOutputFileTemplate("hdfs://foo/bar", "/part-SSSSS")); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java new file mode 100644 index 0000000..828b26e --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/FlattenStreamingTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; + +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming; + +import org.joda.time.Duration; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Test Flatten (union) implementation for streaming. + */ +public class FlattenStreamingTest { + + private static final String[] WORDS_ARRAY_1 = { + "one", "two", "three", "four"}; + private static final List<Iterable<String>> WORDS_QUEUE_1 = + Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_1)); + private static final String[] WORDS_ARRAY_2 = { + "five", "six", "seven", "eight"}; + private static final List<Iterable<String>> WORDS_QUEUE_2 = + Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY_2)); + private static final String[] EXPECTED_UNION = { + "one", "two", "three", "four", "five", "six", "seven", "eight"}; + private static final long TEST_TIMEOUT_MSEC = 1000L; + + @Test + public void testRun() throws Exception { + SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + options.setAppName(this.getClass().getSimpleName()); + options.setRunner(SparkPipelineRunner.class); + options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + Pipeline p = Pipeline.create(options); + + PCollection<String> w1 = + p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of()); + PCollection<String> windowedW1 = + w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); + PCollection<String> w2 = + p.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of()); + PCollection<String> windowedW2 = + w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); + PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); + PCollection<String> union = list.apply(Flatten.<String>pCollections()); + + DataflowAssert.thatIterable(union.apply(View.<String>asIterable())) + .containsInAnyOrder(EXPECTED_UNION); + + EvaluationResult res = SparkPipelineRunner.create(options).run(p); + res.close(); + + DataflowAssertStreaming.assertNoFailures(res); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java new file mode 100644 index 0000000..e9e685b --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.beam.runners.spark.io.KafkaIO; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming; +import org.apache.beam.runners.spark.streaming.utils.EmbeddedKafkaCluster; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import kafka.serializer.StringDecoder; + +/** + * Test Kafka as input. + */ +public class KafkaStreamingTest { + private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = + new EmbeddedKafkaCluster.EmbeddedZookeeper(17001); + private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = + new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), + new Properties(), Collections.singletonList(6667)); + private static final String TOPIC = "kafka_dataflow_test_topic"; + private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of( + "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" + ); + private static final Set<String> EXPECTED = ImmutableSet.of( + "k1,v1", "k2,v2", "k3,v3", "k4,v4" + ); + private static final long TEST_TIMEOUT_MSEC = 1000L; + + @BeforeClass + public static void init() throws IOException { + EMBEDDED_ZOOKEEPER.startup(); + EMBEDDED_KAFKA_CLUSTER.startup(); + + // write to Kafka + Properties producerProps = new Properties(); + producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); + producerProps.put("request.required.acks", 1); + producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); + Serializer<String> stringSerializer = new StringSerializer(); + try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer = + new KafkaProducer(producerProps, stringSerializer, stringSerializer)) { + for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) { + kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); + } + } + } + + @Test + public void testRun() throws Exception { + // test read from Kafka + SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + options.setAppName(this.getClass().getSimpleName()); + options.setRunner(SparkPipelineRunner.class); + options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + Pipeline p = Pipeline.create(options); + + Map<String, String> kafkaParams = ImmutableMap.of( + "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(), + "auto.offset.reset", "smallest" + ); + + PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class, + StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC), + kafkaParams)); + PCollection<KV<String, String>> windowedWords = kafkaInput + .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn())); + + DataflowAssert.thatIterable(formattedKV.apply(View.<String>asIterable())) + .containsInAnyOrder(EXPECTED); + + EvaluationResult res = SparkPipelineRunner.create(options).run(p); + res.close(); + + DataflowAssertStreaming.assertNoFailures(res); + } + + @AfterClass + public static void tearDown() { + EMBEDDED_KAFKA_CLUSTER.shutdown(); + EMBEDDED_ZOOKEEPER.shutdown(); + } + + private static class FormatKVFn extends DoFn<KV<String, String>, String> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + "," + c.element().getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java new file mode 100644 index 0000000..9a0609d --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/SimpleStreamingWordCountTest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableSet; + +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.runners.spark.SimpleWordCountTest; +import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.streaming.utils.DataflowAssertStreaming; + +import org.joda.time.Duration; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class SimpleStreamingWordCountTest { + + private static final String[] WORDS_ARRAY = { + "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; + private static final List<Iterable<String>> WORDS_QUEUE = + Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY)); + private static final Set<String> EXPECTED_COUNT_SET = + ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + private static final long TEST_TIMEOUT_MSEC = 1000L; + + @Test + public void testRun() throws Exception { + SparkStreamingPipelineOptions options = SparkStreamingPipelineOptionsFactory.create(); + options.setAppName(this.getClass().getSimpleName()); + options.setRunner(SparkPipelineRunner.class); + options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval + Pipeline p = Pipeline.create(options); + + PCollection<String> inputWords = + p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of()); + PCollection<String> windowedWords = inputWords + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); + + PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.thatIterable(output.apply(View.<String>asIterable())) + .containsInAnyOrder(EXPECTED_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create(options).run(p); + res.close(); + + DataflowAssertStreaming.assertNoFailures(res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java new file mode 100644 index 0000000..19759d7 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/DataflowAssertStreaming.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming.utils; + +import org.apache.beam.runners.spark.EvaluationResult; + +import org.junit.Assert; + +/** + * Since DataflowAssert doesn't propagate assert exceptions, use Aggregators to assert streaming + * success/failure counters. + */ +public final class DataflowAssertStreaming { + /** + * Copied aggregator names from {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} + */ + static final String SUCCESS_COUNTER = "DataflowAssertSuccess"; + static final String FAILURE_COUNTER = "DataflowAssertFailure"; + + private DataflowAssertStreaming() { + } + + public static void assertNoFailures(EvaluationResult res) { + int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); + Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java new file mode 100644 index 0000000..333453a --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/utils/EmbeddedKafkaCluster.java @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.beam.runners.spark.streaming.utils; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * https://gist.github.com/fjavieralba/7930018 + */ +public class EmbeddedKafkaCluster { + + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); + + private final List<Integer> ports; + private final String zkConnection; + private final Properties baseProperties; + + private final String brokerList; + + private final List<KafkaServer> brokers; + private final List<File> logDirs; + + public EmbeddedKafkaCluster(String zkConnection) { + this(zkConnection, new Properties()); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) { + this(zkConnection, baseProperties, Collections.singletonList(-1)); + } + + public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List<Integer> ports) { + this.zkConnection = zkConnection; + this.ports = resolvePorts(ports); + this.baseProperties = baseProperties; + + this.brokers = new ArrayList<>(); + this.logDirs = new ArrayList<>(); + + this.brokerList = constructBrokerList(this.ports); + } + + private static List<Integer> resolvePorts(List<Integer> ports) { + List<Integer> resolvedPorts = new ArrayList<>(); + for (Integer port : ports) { + resolvedPorts.add(resolvePort(port)); + } + return resolvedPorts; + } + + private static int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + private static String constructBrokerList(List<Integer> ports) { + StringBuilder sb = new StringBuilder(); + for (Integer port : ports) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append("localhost:").append(port); + } + return sb.toString(); + } + + public void startup() { + for (int i = 0; i < ports.size(); i++) { + Integer port = ports.get(i); + File logDir = TestUtils.constructTempDir("kafka-local"); + + Properties properties = new Properties(); + properties.putAll(baseProperties); + properties.setProperty("zookeeper.connect", zkConnection); + properties.setProperty("broker.id", String.valueOf(i + 1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(port)); + properties.setProperty("log.dir", logDir.getAbsolutePath()); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + + KafkaServer broker = startBroker(properties); + + brokers.add(broker); + logDirs.add(logDir); + } + } + + + private static KafkaServer startBroker(Properties props) { + KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime()); + server.startup(); + return server; + } + + public Properties getProps() { + Properties props = new Properties(); + props.putAll(baseProperties); + props.put("metadata.broker.list", brokerList); + props.put("zookeeper.connect", zkConnection); + return props; + } + + public String getBrokerList() { + return brokerList; + } + + public List<Integer> getPorts() { + return ports; + } + + public String getZkConnection() { + return zkConnection; + } + + public void shutdown() { + for (KafkaServer broker : brokers) { + try { + broker.shutdown(); + } catch (Exception e) { + LOG.warn("{}", e.getMessage(), e); + } + } + for (File logDir : logDirs) { + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + LOG.warn("{}", e.getMessage(), e); + } + } + } + + @Override + public String toString() { + return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}"; + } + + public static class EmbeddedZookeeper { + private int port = -1; + private int tickTime = 500; + + private ServerCnxnFactory factory; + private File snapshotDir; + private File logDir; + + public EmbeddedZookeeper() { + this(-1); + } + + public EmbeddedZookeeper(int port) { + this(port, 500); + } + + public EmbeddedZookeeper(int port, int tickTime) { + this.port = resolvePort(port); + this.tickTime = tickTime; + } + + private static int resolvePort(int port) { + if (port == -1) { + return TestUtils.getAvailablePort(); + } + return port; + } + + public void startup() throws IOException { + if (this.port == -1) { + this.port = TestUtils.getAvailablePort(); + } + this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port), + 1024); + this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot"); + this.logDir = TestUtils.constructTempDir("embedded-zk/log"); + + try { + factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + + public void shutdown() { + factory.shutdown(); + try { + TestUtils.deleteFile(snapshotDir); + } catch (FileNotFoundException e) { + // ignore + } + try { + TestUtils.deleteFile(logDir); + } catch (FileNotFoundException e) { + // ignore + } + } + + public String getConnection() { + return "localhost:" + port; + } + + public void setPort(int port) { + this.port = port; + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getPort() { + return port; + } + + public int getTickTime() { + return tickTime; + } + + @Override + public String toString() { + return "EmbeddedZookeeper{" + "connection=" + getConnection() + "}"; + } + } + + static class SystemTime implements Time { + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // Ignore + } + } + } + + static final class TestUtils { + private static final Random RANDOM = new Random(); + + private TestUtils() { + } + + static File constructTempDir(String dirPrefix) { + File file = new File(System.getProperty("java.io.tmpdir"), dirPrefix + RANDOM.nextInt + (10000000)); + if (!file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + file.deleteOnExit(); + return file; + } + + static int getAvailablePort() { + try { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } catch (IOException e) { + throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e); + } + } + + static boolean deleteFile(File path) throws FileNotFoundException { + if (!path.exists()) { + throw new FileNotFoundException(path.getAbsolutePath()); + } + boolean ret = true; + if (path.isDirectory()) { + for (File f : path.listFiles()) { + ret = ret && deleteFile(f); + } + } + return ret && path.delete(); + } + } +}
