Repository: beam
Updated Branches:
  refs/heads/master 0b8932fd3 -> 5f72b83c0


KafkaIO : Add withTopic() api that takes single topic.

Remove need for setting key coder for Writer while writing
values only. If we didn't specifiy the key coder, validation
succeeded but it failed a check while instantiating Kafka producer.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b0d45c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b0d45c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b0d45c

Branch: refs/heads/master
Commit: 37b0d45c76b5fb03cdf5749dee52483fa3811d5b
Parents: 0b8932f
Author: Raghu Angadi <[email protected]>
Authored: Wed Mar 29 08:17:25 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Mar 31 16:51:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 11 ++++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 24 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index bb7d971..80b40be 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -124,7 +124,7 @@ import org.slf4j.LoggerFactory;
  *  pipeline
  *    .apply(KafkaIO.<Long, String>read()
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
- *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       .withTopic("my_topic")  // use withTopics(List<String>) to read from 
multiple topics.
  *       // set a Coder for Key and Value
  *       .withKeyCoder(BigEndianLongCoder.of())
  *       .withValueCoder(StringUtf8Coder.of())
@@ -308,6 +308,15 @@ public class KafkaIO {
     }
 
     /**
+     * Returns a new {@link Read} that reads from the topic.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, 
PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopic(String topic) {
+      return withTopics(ImmutableList.of(topic));
+    }
+
+    /**
      * Returns a new {@link Read} that reads from the topics. All the 
partitions from each
      * of the topics are read.
      * See {@link UnboundedKafkaSource#generateInitialSplits(int, 
PipelineOptions)} for description

http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d1696d0..7e77512 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -295,6 +295,30 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceWithSingleTopic() {
+    // same as testUnboundedSource, but with single topic
+
+    int numElements = 1000;
+    String topic = "my_topic";
+
+    KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
+        .withBootstrapServers("none")
+        .withTopic("my_topic")
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            ImmutableList.of(topic), 10, numElements, 
OffsetResetStrategy.EARLIEST))
+        .withKeyCoder(BigEndianIntegerCoder.of())
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements);
+
+    PCollection<Long> input = p
+        .apply(reader.withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceWithExplicitPartitions() {
     int numElements = 1000;

Reply via email to