Repository: beam Updated Branches: refs/heads/master 0b8932fd3 -> 5f72b83c0
KafkaIO : Add withTopic() api that takes single topic. Remove need for setting key coder for Writer while writing values only. If we didn't specifiy the key coder, validation succeeded but it failed a check while instantiating Kafka producer. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b0d45c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b0d45c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b0d45c Branch: refs/heads/master Commit: 37b0d45c76b5fb03cdf5749dee52483fa3811d5b Parents: 0b8932f Author: Raghu Angadi <[email protected]> Authored: Wed Mar 29 08:17:25 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Mar 31 16:51:20 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 11 ++++++++- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 24 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index bb7d971..80b40be 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -124,7 +124,7 @@ import org.slf4j.LoggerFactory; * pipeline * .apply(KafkaIO.<Long, String>read() * .withBootstrapServers("broker_1:9092,broker_2:9092") - * .withTopics(ImmutableList.of("topic_a", "topic_b")) + * .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. * // set a Coder for Key and Value * .withKeyCoder(BigEndianLongCoder.of()) * .withValueCoder(StringUtf8Coder.of()) @@ -308,6 +308,15 @@ public class KafkaIO { } /** + * Returns a new {@link Read} that reads from the topic. + * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * of how the partitions are distributed among the splits. + */ + public Read<K, V> withTopic(String topic) { + return withTopics(ImmutableList.of(topic)); + } + + /** * Returns a new {@link Read} that reads from the topics. All the partitions from each * of the topics are read. * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d1696d0..7e77512 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -295,6 +295,30 @@ public class KafkaIOTest { } @Test + public void testUnboundedSourceWithSingleTopic() { + // same as testUnboundedSource, but with single topic + + int numElements = 1000; + String topic = "my_topic"; + + KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read() + .withBootstrapServers("none") + .withTopic("my_topic") + .withConsumerFactoryFn(new ConsumerFactoryFn( + ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withMaxNumRecords(numElements); + + PCollection<Long> input = p + .apply(reader.withoutMetadata()) + .apply(Values.<Long>create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test @Category(NeedsRunner.class) public void testUnboundedSourceWithExplicitPartitions() { int numElements = 1000;
