[BEAM-11] set coder for pipeline input
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5c7df60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5c7df60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5c7df60 Branch: refs/heads/master Commit: c5c7df603289b1d207308da16546dd56e9b9b6d9 Parents: 41c4ca6 Author: Sela <[email protected]> Authored: Sat Mar 12 17:26:34 2016 +0200 Committer: Sela <[email protected]> Committed: Tue Mar 15 20:38:26 2016 +0200 ---------------------------------------------------------------------- .../spark/io/hadoop/HadoopFileFormatPipelineTest.java | 5 ++++- .../beam/runners/spark/streaming/KafkaStreamingTest.java | 11 +++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5c7df60/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index 7a9be8b..abe1119 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -16,11 +16,13 @@ package org.apache.beam.runners.spark.io.hadoop; import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -66,7 +68,8 @@ public class HadoopFileFormatPipelineTest { (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class; HadoopIO.Read.Bound<IntWritable,Text> read = HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class); - PCollection<KV<IntWritable, Text>> input = p.apply(read); + PCollection<KV<IntWritable, Text>> input = p.apply(read) + .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class))); @SuppressWarnings("unchecked") Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass = (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5c7df60/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java index e9e685b..ff1e11c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java @@ -15,6 +15,8 @@ package org.apache.beam.runners.spark.streaming; import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; @@ -79,7 +81,7 @@ public class KafkaStreamingTest { producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); Serializer<String> stringSerializer = new StringSerializer(); try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer = - new KafkaProducer(producerProps, stringSerializer, stringSerializer)) { + new KafkaProducer(producerProps, stringSerializer, stringSerializer)) { for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) { kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); } @@ -96,13 +98,14 @@ public class KafkaStreamingTest { Pipeline p = Pipeline.create(options); Map<String, String> kafkaParams = ImmutableMap.of( - "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(), - "auto.offset.reset", "smallest" + "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(), + "auto.offset.reset", "smallest" ); PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class, StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC), - kafkaParams)); + kafkaParams)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); PCollection<KV<String, String>> windowedWords = kafkaInput .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
