Repository: beam
Updated Branches:
  refs/heads/master 420a71860 -> 0e6b3794c


Converts KafkaIO to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0c704f1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0c704f1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0c704f1

Branch: refs/heads/master
Commit: e0c704f1c3ccfe760dc9bbb156fe4a56843f2b3a
Parents: 420a718
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Sep 29 15:30:10 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Wed Feb 1 14:26:46 2017 -0800

----------------------------------------------------------------------
 .../streaming/KafkaStreamingTest.java           |   4 +-
 .../ResumeFromCheckpointStreamingTest.java      |   2 +-
 sdks/java/io/kafka/pom.xml                      |   6 +
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 619 ++++++++-----------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  17 +-
 5 files changed, 287 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 0853e9f..404cb5d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -116,7 +116,7 @@ public class KafkaStreamingTest {
         "auto.offset.reset", "earliest"
     );
 
-    KafkaIO.Read<String, String> read = KafkaIO.read()
+    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Arrays.asList(topic1, topic2))
         .withKeyCoder(StringUtf8Coder.of())
@@ -168,7 +168,7 @@ public class KafkaStreamingTest {
         "auto.offset.reset", "latest"
     );
 
-    KafkaIO.Read<String, String> read = KafkaIO.read()
+    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(topic))
         .withKeyCoder(StringUtf8Coder.of())

http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 8280672..721d617 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -160,7 +160,7 @@ public class ResumeFromCheckpointStreamingTest {
         "auto.offset.reset", "earliest"
     );
 
-    KafkaIO.Read<String, String> read = KafkaIO.read()
+    KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(TOPIC))
         .withKeyCoder(StringUtf8Coder.of())

http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 2dd775e..02150b2 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -101,6 +101,12 @@
       <artifactId>jsr305</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- test dependencies-->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 36ab1fd..80a0eb7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -21,10 +21,10 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -32,6 +32,8 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -49,13 +51,12 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
-
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
@@ -116,17 +117,16 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code
  *
  *  pipeline
- *    .apply(KafkaIO.read()
+ *    .apply(KafkaIO.<Long, String>read()
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
  *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
- *       // above two are required configuration. returns 
PCollection<KafkaRecord<byte[], byte[]>
+ *       // set a Coder for Key and Value
+ *       .withKeyCoder(BigEndianLongCoder.of())
+ *       .withValueCoder(StringUtf8Coder.of())
+ *       // above four are required configuration. returns 
PCollection<KafkaRecord<Long, String>>
  *
  *       // rest of the settings are optional :
  *
- *       // set a Coder for Key and Value (note the change to return type)
- *       .withKeyCoder(BigEndianLongCoder.of()) // 
PCollection<KafkaRecord<Long, byte[]>
- *       .withValueCoder(StringUtf8Coder.of())  // 
PCollection<KafkaRecord<Long, String>
- *
  *       // you can further customize KafkaConsumer used to read the records 
by adding more
  *       // settings for ConsumerConfig. e.g :
  *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 
1024 * 1024))
@@ -166,14 +166,14 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code
  *
  *  PCollection<KV<Long, String>> kvColl = ...;
- *  kvColl.apply(KafkaIO.write()
+ *  kvColl.apply(KafkaIO.<Long, String>write()
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
  *       .withTopic("results")
  *
  *       // set Coder for Key and Value
  *       .withKeyCoder(BigEndianLongCoder.of())
  *       .withValueCoder(StringUtf8Coder.of())
-
+ *
  *       // you can further customize KafkaProducer used to write the records 
by adding more
  *       // settings for ProducerConfig. e.g, to enable compression :
  *       .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
@@ -185,11 +185,11 @@ import org.slf4j.LoggerFactory;
  *
  * <pre>{@code
  *  PCollection<String> strings = ...;
- *  strings.apply(KafkaIO.write()
+ *  strings.apply(KafkaIO.<Void, String>write()
  *      .withBootstrapServers("broker_1:9092,broker_2:9092")
  *      .withTopic("results")
  *      .withValueCoder(StringUtf8Coder.of()) // just need coder for value
- *      .values() // writes values to Kafka with default key
+ *      .values()
  *    );
  * }</pre>
  *
@@ -200,6 +200,23 @@ import org.slf4j.LoggerFactory;
  * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
  */
 public class KafkaIO {
+  /**
+   * Creates an uninitialized {@link Read} {@link PTransform}. Before use, 
basic Kafka
+   * configuration should set with {@link Read#withBootstrapServers(String)} 
and
+   * {@link Read#withTopics(List)}. Other optional settings include key and 
value coders,
+   * custom timestamp and watermark functions.
+   */
+  public static Read<byte[], byte[]> readBytes() {
+    return new AutoValue_KafkaIO_Read.Builder<byte[], byte[]>()
+        .setTopics(new ArrayList<String>())
+        .setTopicPartitions(new ArrayList<TopicPartition>())
+        .setKeyCoder(ByteArrayCoder.of())
+        .setValueCoder(ByteArrayCoder.of())
+        .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN)
+        .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
+        .setMaxNumRecords(Long.MAX_VALUE)
+        .build();
+  }
 
   /**
    * Creates an uninitialized {@link Read} {@link PTransform}. Before use, 
basic Kafka
@@ -207,16 +224,14 @@ public class KafkaIO {
    * {@link Read#withTopics(List)}. Other optional settings include key and 
value coders,
    * custom timestamp and watermark functions.
    */
-  public static Read<byte[], byte[]> read() {
-    return new Read<byte[], byte[]>(
-        new ArrayList<String>(),
-        new ArrayList<TopicPartition>(),
-        ByteArrayCoder.of(),
-        ByteArrayCoder.of(),
-        Read.KAFKA_9_CONSUMER_FACTORY_FN,
-        Read.DEFAULT_CONSUMER_PROPERTIES,
-        Long.MAX_VALUE,
-        null);
+  public static <K, V> Read<K, V> read() {
+    return new AutoValue_KafkaIO_Read.Builder<K, V>()
+        .setTopics(new ArrayList<String>())
+        .setTopicPartitions(new ArrayList<TopicPartition>())
+        .setConsumerFactoryFn(Read.KAFKA_9_CONSUMER_FACTORY_FN)
+        .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
+        .setMaxNumRecords(Long.MAX_VALUE)
+        .build();
   }
 
   /**
@@ -224,21 +239,53 @@ public class KafkaIO {
    * should be set with {@link Write#withBootstrapServers(String)} and {@link 
Write#withTopic}
    * along with {@link Coder}s for (optional) key and values.
    */
-  public static Write<byte[], byte[]> write() {
-    return new Write<byte[], byte[]>(
-        null,
-        ByteArrayCoder.of(),
-        ByteArrayCoder.of(),
-        TypedWrite.DEFAULT_PRODUCER_PROPERTIES);
+  public static <K, V> Write<K, V> write() {
+    return new AutoValue_KafkaIO_Write.Builder<K, V>()
+        .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
+        .setValueOnly(false)
+        .build();
   }
 
   ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
 
   /**
-   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for 
more
-   * information on usage and configuration.
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for 
more information on
+   * usage and configuration.
    */
-  public static class Read<K, V> extends TypedRead<K, V> {
+  @AutoValue
+  public abstract static class Read<K, V>
+      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+    abstract Map<String, Object> getConsumerConfig();
+    abstract List<String> getTopics();
+    abstract List<TopicPartition> getTopicPartitions();
+    @Nullable abstract Coder<K> getKeyCoder();
+    @Nullable abstract Coder<V> getValueCoder();
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
+        getConsumerFactoryFn();
+    @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getTimestampFn();
+    @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getWatermarkFn();
+
+    abstract long getMaxNumRecords();
+    @Nullable abstract Duration getMaxReadTime();
+
+    abstract Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+      abstract Builder<K, V> setTopics(List<String> topics);
+      abstract Builder<K, V> setTopicPartitions(List<TopicPartition> 
topicPartitions);
+      abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+      abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+      abstract Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn);
+      abstract Builder<K, V> 
setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+      abstract Builder<K, V> 
setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+      abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
+      abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
+
+      abstract Read<K, V> build();
+    }
 
     /**
      * Returns a new {@link Read} with Kafka consumer pointing to {@code 
bootstrapServers}.
@@ -256,10 +303,9 @@ public class KafkaIO {
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopics(List<String> topics) {
-      checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions 
can be set, not both");
-
-      return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, 
keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+      checkState(
+          getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
+      return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
     }
 
     /**
@@ -269,26 +315,22 @@ public class KafkaIO {
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> 
topicPartitions) {
-      checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, 
not both");
-
-      return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), 
keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+      checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
+      return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
     /**
      * Returns a new {@link Read} with {@link Coder} for key bytes.
      */
-    public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
-      return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    public Read<K, V> withKeyCoder(Coder<K> keyCoder) {
+      return toBuilder().setKeyCoder(keyCoder).build();
     }
 
     /**
      * Returns a new {@link Read} with {@link Coder} for value bytes.
      */
-    public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
-      return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    public Read<K, V> withValueCoder(Coder<V> valueCoder) {
+      return toBuilder().setValueCoder(valueCoder).build();
     }
 
     /**
@@ -298,20 +340,16 @@ public class KafkaIO {
      */
     public Read<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
     /**
      * Update consumer configuration with new properties.
      */
     public Read<K, V> updateConsumerProperties(Map<String, Object> 
configUpdates) {
-
-      Map<String, Object> config = updateKafkaProperties(consumerConfig,
+      Map<String, Object> config = updateKafkaProperties(getConsumerConfig(),
           IGNORED_CONSUMER_PROPERTIES, configUpdates);
-
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, config, maxNumRecords, maxReadTime);
+      return toBuilder().setConsumerConfig(config).build();
     }
 
     /**
@@ -319,8 +357,7 @@ public class KafkaIO {
      * Mainly used for tests and demo applications.
      */
     public Read<K, V> withMaxNumRecords(long maxNumRecords) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, null);
+      return 
toBuilder().setMaxNumRecords(maxNumRecords).setMaxReadTime(null).build();
     }
 
     /**
@@ -330,100 +367,32 @@ public class KafkaIO {
      * applications.
      */
     public Read<K, V> withMaxReadTime(Duration maxReadTime) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
-    }
-
-    
///////////////////////////////////////////////////////////////////////////////////////
-
-    private Read(
-        List<String> topics,
-        List<TopicPartition> topicPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn,
-        Map<String, Object> consumerConfig,
-        long maxNumRecords,
-        @Nullable Duration maxReadTime) {
-
-      super(topics, topicPartitions, keyCoder, valueCoder, null, null,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+      return 
toBuilder().setMaxNumRecords(Long.MAX_VALUE).setMaxReadTime(maxReadTime).build();
     }
 
     /**
-     * A set of properties that are not required or don't make sense for our 
consumer.
-     */
-    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = 
ImmutableMap.of(
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder 
instead"
-        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
-        //     lets allow these, applications can have better resume point for 
restarts.
-        );
-
-    // set config defaults
-    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
-        ImmutableMap.<String, Object>of(
-            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
-            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
-
-            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ 
not be required.
-            // with default value of of 32K, It takes multiple seconds between 
successful polls.
-            // All the consumer work is done inside poll(), with smaller send 
buffer size, it
-            // takes many polls before a 1MB chunk from the server is fully 
read. In my testing
-            // about half of the time select() inside kafka consumer waited 
for 20-30ms, though
-            // the server had lots of data in tcp send buffers on its side. 
Compared to default,
-            // this setting increased throughput increased by many fold (3-4x).
-            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
-
-            // default to latest offset when we are not resuming.
-            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
-            // disable auto commit of offsets. we don't require group_id. 
could be enabled by user.
-            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-    // default Kafka 0.9 Consumer supplier.
-    private static final SerializableFunction<Map<String, Object>, 
Consumer<byte[], byte[]>>
-      KAFKA_9_CONSUMER_FACTORY_FN =
-        new SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>() {
-          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-            return new KafkaConsumer<>(config);
-          }
-        };
-  }
-
-  /**
-   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for 
more
-   * information on usage and configuration.
-   */
-  public static class TypedRead<K, V>
-                      extends PTransform<PBegin, PCollection<KafkaRecord<K, 
V>>> {
-
-    /**
      * A function to assign a timestamp to a record. Default is processing 
timestamp.
      */
-    public TypedRead<K, V> withTimestampFn2(
+    public Read<K, V> withTimestampFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
       checkNotNull(timestampFn);
-      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
-          maxNumRecords, maxReadTime);
+      return toBuilder().setTimestampFn(timestampFn).build();
     }
 
     /**
      * A function to calculate watermark after a record. Default is last 
record timestamp
      * @see #withTimestampFn(SerializableFunction)
      */
-    public TypedRead<K, V> withWatermarkFn2(
+    public Read<K, V> withWatermarkFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
       checkNotNull(watermarkFn);
-      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
-          maxNumRecords, maxReadTime);
+      return toBuilder().setWatermarkFn(watermarkFn).build();
     }
 
     /**
      * A function to assign a timestamp to a record. Default is processing 
timestamp.
      */
-    public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, 
Instant> timestampFn) {
+    public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> 
timestampFn) {
       checkNotNull(timestampFn);
       return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
     }
@@ -432,7 +401,7 @@ public class KafkaIO {
      * A function to calculate watermark after a record. Default is last 
record timestamp
      * @see #withTimestampFn(SerializableFunction)
      */
-    public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, 
Instant> watermarkFn) {
+    public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> 
watermarkFn) {
       checkNotNull(watermarkFn);
       return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
     }
@@ -445,6 +414,15 @@ public class KafkaIO {
     }
 
     @Override
+    public void validate(PBegin input) {
+      
checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+          "Kafka bootstrap servers should be set");
+      checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
+          "Kafka topics or topic_partitions are required");
+      checkNotNull(getKeyCoder(), "Key coder must be set");
+      checkNotNull(getValueCoder(), "Value coder must be set");
+    }
+
     public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
      // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
@@ -452,100 +430,92 @@ public class KafkaIO {
 
       PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
 
-      if (maxNumRecords < Long.MAX_VALUE) {
-        transform = unbounded.withMaxNumRecords(maxNumRecords);
-      } else if (maxReadTime != null) {
-        transform = unbounded.withMaxReadTime(maxReadTime);
+      if (getMaxNumRecords() < Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(getMaxNumRecords());
+      } else if (getMaxReadTime() != null) {
+        transform = unbounded.withMaxReadTime(getMaxReadTime());
       }
 
       return input.getPipeline().apply(transform);
     }
 
-    
////////////////////////////////////////////////////////////////////////////////////////
-
-    protected final List<String> topics;
-    protected final List<TopicPartition> topicPartitions; // mutually 
exclusive with topics
-    protected final Coder<K> keyCoder;
-    protected final Coder<V> valueCoder;
-    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> 
timestampFn;
-    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> 
watermarkFn;
-    protected final
-      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn;
-    protected final Map<String, Object> consumerConfig;
-    protected final long maxNumRecords; // bounded read, mainly for testing
-    protected final Duration maxReadTime; // bounded read, mainly for testing
-
-    private TypedRead(List<String> topics,
-        List<TopicPartition> topicPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn,
-        Map<String, Object> consumerConfig,
-        long maxNumRecords,
-        @Nullable Duration maxReadTime) {
-      super("KafkaIO.Read");
-
-      this.topics = topics;
-      this.topicPartitions = topicPartitions;
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-      this.timestampFn = timestampFn;
-      this.watermarkFn = watermarkFn;
-      this.consumerFactoryFn = consumerFactoryFn;
-      this.consumerConfig = consumerConfig;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
-    }
-
     /**
      * Creates an {@link UnboundedSource UnboundedSource&lt;KafkaRecord&lt;K, 
V&gt;, ?&gt;} with the
-     * configuration in {@link TypedRead}. Primary use case is unit tests, 
should not be used in an
+     * configuration in {@link Read}. Primary use case is unit tests, should 
not be used in an
      * application.
      */
     @VisibleForTesting
     UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
-      return new UnboundedKafkaSource<K, V>(
-          -1,
-          topics,
-          topicPartitions,
-          keyCoder,
-          valueCoder,
-          timestampFn,
-          Optional.fromNullable(watermarkFn),
-          consumerFactoryFn,
-          consumerConfig);
+      return new UnboundedKafkaSource<K, V>(this, -1);
     }
 
     // utility method to convert KafkRecord<K, V> to user KV<K, V> before 
applying user functions
     private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, 
ValueT>, OutT>
-      unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> 
fn) {
-        return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
-          public OutT apply(KafkaRecord<KeyT, ValueT> record) {
-            return fn.apply(record.getKV());
+    unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
+      return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+        public OutT apply(KafkaRecord<KeyT, ValueT> record) {
+          return fn.apply(record.getKV());
+        }
+      };
+    }
+    
///////////////////////////////////////////////////////////////////////////////////////
+
+    /**
+     * A set of properties that are not required or don't make sense for our 
consumer.
+     */
+    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = 
ImmutableMap.of(
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder 
instead"
+        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
+        //     lets allow these, applications can have better resume point for 
restarts.
+        );
+
+    // set config defaults
+    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+        ImmutableMap.<String, Object>of(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
+
+            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ 
not be required.
+            // with default value of of 32K, It takes multiple seconds between 
successful polls.
+            // All the consumer work is done inside poll(), with smaller send 
buffer size, it
+            // takes many polls before a 1MB chunk from the server is fully 
read. In my testing
+            // about half of the time select() inside kafka consumer waited 
for 20-30ms, though
+            // the server had lots of data in tcp send buffers on its side. 
Compared to default,
+            // this setting increased throughput increased by many fold (3-4x).
+            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
+
+            // default to latest offset when we are not resuming.
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
+            // disable auto commit of offsets. we don't require group_id. 
could be enabled by user.
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    // default Kafka 0.9 Consumer supplier.
+    private static final SerializableFunction<Map<String, Object>, 
Consumer<byte[], byte[]>>
+      KAFKA_9_CONSUMER_FACTORY_FN =
+        new SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>() {
+          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+            return new KafkaConsumer<>(config);
           }
         };
-      }
   }
 
   /**
-   * A {@link PTransform} to read from Kafka topics. Similar to {@link 
KafkaIO.TypedRead}, but
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link 
KafkaIO.Read}, but
    * removes Kafka metatdata and returns a {@link PCollection} of {@link KV}.
    * See {@link KafkaIO} for more information on usage and configuration of 
reader.
    */
   public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, 
PCollection<KV<K, V>>> {
+    private final Read<K, V> read;
 
-    private final TypedRead<K, V> typedRead;
-
-    TypedWithoutMetadata(TypedRead<K, V> read) {
+    TypedWithoutMetadata(Read<K, V> read) {
       super("KafkaIO.Read");
-      this.typedRead = read;
+      this.read = read;
     }
 
     @Override
     public PCollection<KV<K, V>> expand(PBegin begin) {
-      return typedRead
+      return read
           .expand(begin)
           .apply("Remove Kafka Metadata",
               ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
@@ -581,52 +551,17 @@ public class KafkaIO {
     return config;
   }
 
-  private static class NowTimestampFn<T> implements SerializableFunction<T, 
Instant> {
-    @Override
-    public Instant apply(T input) {
-      return Instant.now();
-    }
-  }
-
   /** Static class, prevent instantiation. */
   private KafkaIO() {}
 
   private static class UnboundedKafkaSource<K, V>
       extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
-
+    private Read<K, V> spec;
     private final int id; // split id, mainly for debugging
-    private final List<String> topics;
-    private final List<TopicPartition> assignedPartitions;
-    private final Coder<K> keyCoder;
-    private final Coder<V> valueCoder;
-    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
-    // would it be a good idea to pass currentTimestamp to watermarkFn?
-    private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> 
watermarkFn;
-    private
-      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn;
-    private final Map<String, Object> consumerConfig;
-
-    public UnboundedKafkaSource(
-        int id,
-        List<String> topics,
-        List<TopicPartition> assignedPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
-        Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn,
-        Map<String, Object> consumerConfig) {
 
+    public UnboundedKafkaSource(Read<K, V> spec, int id) {
+      this.spec = spec;
       this.id = id;
-      this.assignedPartitions = assignedPartitions;
-      this.topics = topics;
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-      this.timestampFn =
-          (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : 
timestampFn);
-      this.watermarkFn = watermarkFn;
-      this.consumerFactoryFn = consumerFactoryFn;
-      this.consumerConfig = consumerConfig;
     }
 
     /**
@@ -642,15 +577,16 @@ public class KafkaIO {
     public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
         int desiredNumSplits, PipelineOptions options) throws Exception {
 
-      List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
+      List<TopicPartition> partitions = new 
ArrayList<>(spec.getTopicPartitions());
 
       // (a) fetch partitions for each topic
       // (b) sort by <topic, partition>
       // (c) round-robin assign the partitions to splits
 
       if (partitions.isEmpty()) {
-        try (Consumer<?, ?> consumer = 
consumerFactoryFn.apply(consumerConfig)) {
-          for (String topic : topics) {
+        try (Consumer<?, ?> consumer =
+            spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
+          for (String topic : spec.getTopics()) {
             for (PartitionInfo p : consumer.partitionsFor(topic)) {
               partitions.add(new TopicPartition(p.topic(), p.partition()));
             }
@@ -690,16 +626,13 @@ public class KafkaIO {
         LOG.info("Partitions assigned to split {} (total {}): {}",
             i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
 
-        result.add(new UnboundedKafkaSource<K, V>(
-            i,
-            this.topics,
-            assignedToSplit,
-            this.keyCoder,
-            this.valueCoder,
-            this.timestampFn,
-            this.watermarkFn,
-            this.consumerFactoryFn,
-            this.consumerConfig));
+        result.add(
+            new UnboundedKafkaSource<>(
+                spec.toBuilder()
+                    .setTopics(Collections.<String>emptyList())
+                    .setTopicPartitions(assignedToSplit)
+                    .build(),
+                i));
       }
 
       return result;
@@ -708,7 +641,7 @@ public class KafkaIO {
     @Override
     public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
                                                    KafkaCheckpointMark 
checkpointMark) {
-      if (assignedPartitions.isEmpty()) {
+      if (spec.getTopicPartitions().isEmpty()) {
         LOG.warn("Looks like generateSplits() is not called. Generate single 
split.");
         try {
           return new UnboundedKafkaReader<K, V>(
@@ -734,15 +667,12 @@ public class KafkaIO {
 
     @Override
     public void validate() {
-      checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-          "Kafka bootstrap servers should be set");
-      checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
-          "Kafka topics or topic_partitions are required");
+      spec.validate(null);
     }
 
     @Override
     public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
-      return KafkaRecordCoder.of(keyCoder, valueCoder);
+      return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
     }
   }
 
@@ -839,7 +769,8 @@ public class KafkaIO {
       this.source = source;
       this.name = "Reader-" + source.id;
 
-      partitionStates = 
ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
+      List<TopicPartition> partitions = source.spec.getTopicPartitions();
+      partitionStates = ImmutableList.copyOf(Lists.transform(partitions,
           new Function<TopicPartition, PartitionState>() {
             public PartitionState apply(TopicPartition tp) {
               return new PartitionState(tp, UNINITIALIZED_OFFSET);
@@ -850,13 +781,13 @@ public class KafkaIO {
         // a) verify that assigned and check-pointed partitions match exactly
         // b) set consumed offsets
 
-        checkState(checkpointMark.getPartitions().size() == 
source.assignedPartitions.size(),
+        checkState(checkpointMark.getPartitions().size() == partitions.size(),
             "checkPointMark and assignedPartitions should match");
         // we could consider allowing a mismatch, though it is not expected in 
current Dataflow
 
-        for (int i = 0; i < source.assignedPartitions.size(); i++) {
+        for (int i = 0; i < partitions.size(); i++) {
           PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
-          TopicPartition assigned = source.assignedPartitions.get(i);
+          TopicPartition assigned = partitions.get(i);
           TopicPartition partition = new TopicPartition(ckptMark.getTopic(),
                                                         
ckptMark.getPartition());
           checkState(partition.equals(assigned),
@@ -920,8 +851,9 @@ public class KafkaIO {
 
     @Override
     public boolean start() throws IOException {
-      consumer = source.consumerFactoryFn.apply(source.consumerConfig);
-      consumer.assign(source.assignedPartitions);
+      Read<K, V> spec = source.spec;
+      consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
+      consumer.assign(spec.getTopicPartitions());
 
       for (PartitionState p : partitionStates) {
         if (p.nextOffset != UNINITIALIZED_OFFSET) {
@@ -948,16 +880,16 @@ public class KafkaIO {
 
       // offsetConsumer setup :
 
-      Object groupId = 
source.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
+      Object groupId = 
spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
       // override group_id and disable auto_commit so that it does not 
interfere with main consumer
       String offsetGroupId = String.format("%s_offset_consumer_%d_%s", name,
           (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" 
: groupId));
-      Map<String, Object> offsetConsumerConfig = new 
HashMap<>(source.consumerConfig);
+      Map<String, Object> offsetConsumerConfig = new 
HashMap<>(spec.getConsumerConfig());
       offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
       offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false);
 
-      offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
-      offsetConsumer.assign(source.assignedPartitions);
+      offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
+      offsetConsumer.assign(spec.getTopicPartitions());
 
       offsetFetcherThread.scheduleAtFixedRate(
           new Runnable() {
@@ -1018,10 +950,11 @@ public class KafkaIO {
               rawRecord.topic(),
               rawRecord.partition(),
               rawRecord.offset(),
-              decode(rawRecord.key(), source.keyCoder),
-              decode(rawRecord.value(), source.valueCoder));
+              decode(rawRecord.key(), source.spec.getKeyCoder()),
+              decode(rawRecord.value(), source.spec.getValueCoder()));
 
-          curTimestamp = source.timestampFn.apply(record);
+          curTimestamp = (source.spec.getTimestampFn() == null)
+              ? Instant.now() : source.spec.getTimestampFn().apply(record);
           curRecord = record;
 
           int recordSize = (rawRecord.key() == null ? 0 : 
rawRecord.key().length)
@@ -1081,8 +1014,8 @@ public class KafkaIO {
         return initialWatermark;
       }
 
-      return source.watermarkFn.isPresent()
-          ? source.watermarkFn.get().apply(curRecord) : curTimestamp;
+      return source.spec.getWatermarkFn() != null
+          ? source.spec.getWatermarkFn().apply(curRecord) : curTimestamp;
     }
 
     @Override
@@ -1170,7 +1103,29 @@ public class KafkaIO {
    * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for 
more
    * information on usage and configuration.
    */
-  public static class Write<K, V> extends TypedWrite<K, V> {
+  @AutoValue
+  public abstract static class Write<K, V> extends 
PTransform<PCollection<KV<K, V>>, PDone> {
+    @Nullable abstract String getTopic();
+    @Nullable abstract Coder<K> getKeyCoder();
+    @Nullable abstract Coder<V> getValueCoder();
+    abstract boolean getValueOnly();
+    abstract Map<String, Object> getProducerConfig();
+    @Nullable
+    abstract SerializableFunction<Map<String, Object>, Producer<K, V>> 
getProducerFactoryFn();
+
+    abstract Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract Builder<K, V> setTopic(String topic);
+      abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+      abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+      abstract Builder<K, V> setValueOnly(boolean valueOnly);
+      abstract Builder<K, V> setProducerConfig(Map<String, Object> 
producerConfig);
+      abstract Builder<K, V> setProducerFactoryFn(
+          SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
+      abstract Write<K, V> build();
+    }
 
     /**
      * Returns a new {@link Write} transform with Kafka producer pointing to
@@ -1186,7 +1141,7 @@ public class KafkaIO {
      * Returns a new {@link Write} transform that writes to given topic.
      */
     public Write<K, V> withTopic(String topic) {
-      return new Write<K, V>(topic, keyCoder, valueCoder, producerConfig);
+      return toBuilder().setTopic(topic).build();
     }
 
     /**
@@ -1194,95 +1149,55 @@ public class KafkaIO {
      * A key is optional while writing to Kafka. Note when a key is set, its 
hash is used to
      * determine partition in Kafka (see {@link ProducerRecord} for more 
details).
      */
-    public <KeyT> Write<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
-      return new Write<KeyT, V>(topic, keyCoder, valueCoder, producerConfig);
+    public Write<K, V> withKeyCoder(Coder<K> keyCoder) {
+      return toBuilder().setKeyCoder(keyCoder).build();
     }
 
     /**
      * Returns a new {@link Write} with {@link Coder} for serializing value to 
bytes.
      */
-    public <ValueT> Write<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
-      return new Write<K, ValueT>(topic, keyCoder, valueCoder, producerConfig);
+    public Write<K, V> withValueCoder(Coder<V> valueCoder) {
+      return toBuilder().setValueCoder(valueCoder).build();
     }
 
     public Write<K, V> updateProducerProperties(Map<String, Object> 
configUpdates) {
-      Map<String, Object> config = updateKafkaProperties(producerConfig,
-          TypedWrite.IGNORED_PRODUCER_PROPERTIES, configUpdates);
-      return new Write<K, V>(topic, keyCoder, valueCoder, config);
+      Map<String, Object> config = updateKafkaProperties(getProducerConfig(),
+          IGNORED_PRODUCER_PROPERTIES, configUpdates);
+      return toBuilder().setProducerConfig(config).build();
     }
 
-    private Write(
-        String topic,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        Map<String, Object> producerConfig) {
-      super(topic, keyCoder, valueCoder, producerConfig,
-          Optional.<SerializableFunction<Map<String, Object>, Producer<K, 
V>>>absent());
-    }
-  }
-
-  /**
-   * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for 
more
-   * information on usage and configuration.
-   */
-  public static class TypedWrite<K, V> extends PTransform<PCollection<KV<K, 
V>>, PDone> {
-
     /**
      * Returns a new {@link Write} with a custom function to create Kafka 
producer. Primarily used
      * for tests. Default is {@link KafkaProducer}
      */
-    public TypedWrite<K, V> withProducerFactoryFn(
+    public Write<K, V> withProducerFactoryFn(
         SerializableFunction<Map<String, Object>, Producer<K, V>> 
producerFactoryFn) {
-      return new TypedWrite<K, V>(topic, keyCoder, valueCoder, producerConfig,
-          Optional.of(producerFactoryFn));
+      return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
     }
 
     /**
      * Returns a new transform that writes just the values to Kafka. This is 
useful for writing
      * collections of values rather thank {@link KV}s.
      */
-    @SuppressWarnings("unchecked")
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<V>((TypedWrite<Void, V>) this);
-      // Any way to avoid casting here to TypedWrite<Void, V>? We can't create
-      // new TypedWrite without casting producerFactoryFn.
+      return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
     }
 
     @Override
     public PDone expand(PCollection<KV<K, V>> input) {
-      input.apply(ParDo.of(new KafkaWriter<K, V>(
-          topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt)));
+      input.apply(ParDo.of(new KafkaWriter<>(this)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
     public void validate(PCollection<KV<K, V>> input) {
-      checkNotNull(producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+      
checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
-      checkNotNull(topic, "Kafka topic should be set");
-    }
-
-    
//////////////////////////////////////////////////////////////////////////////////////////
-
-    protected final String topic;
-    protected final Coder<K> keyCoder;
-    protected final Coder<V> valueCoder;
-    protected final Optional<SerializableFunction<Map<String, Object>, 
Producer<K, V>>>
-        producerFactoryFnOpt;
-    protected final Map<String, Object> producerConfig;
-
-    protected TypedWrite(
-        String topic,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        Map<String, Object> producerConfig,
-        Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> 
producerFactoryFnOpt) {
-
-      this.topic = topic;
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-      this.producerConfig = producerConfig;
-      this.producerFactoryFnOpt = producerFactoryFnOpt;
+      checkNotNull(getTopic(), "Kafka topic should be set");
+      if (!getValueOnly()) {
+        checkNotNull(getKeyCoder(), "Key coder should be set");
+      }
+      checkNotNull(getValueCoder(), "Value coder should be set");
     }
 
     // set config defaults
@@ -1307,11 +1222,10 @@ public class KafkaIO {
    * Same as {@code Write<K, V>} without a Key. Null is used for key as it is 
the convention is
    * Kafka when there is no key specified. Majority of Kafka writers don't 
specify a key.
    */
-  private static class KafkaValueWrite<V> extends PTransform<PCollection<V>, 
PDone> {
+  private static class KafkaValueWrite<K, V> extends 
PTransform<PCollection<V>, PDone> {
+    private final Write<K, V> kvWriteTransform;
 
-    private final TypedWrite<Void, V> kvWriteTransform;
-
-    private KafkaValueWrite(TypedWrite<Void, V> kvWriteTransform) {
+    private KafkaValueWrite(Write<K, V> kvWriteTransform) {
       this.kvWriteTransform = kvWriteTransform;
     }
 
@@ -1319,23 +1233,36 @@ public class KafkaIO {
     public PDone expand(PCollection<V> input) {
       return input
         .apply("Kafka values with default key",
-          MapElements.via(new SimpleFunction<V, KV<Void, V>>() {
+          MapElements.via(new SimpleFunction<V, KV<K, V>>() {
             @Override
-            public KV<Void, V> apply(V element) {
-              return KV.<Void, V>of(null, element);
+            public KV<K, V> apply(V element) {
+              return KV.of(null, element);
             }
           }))
-        .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder))
+        .setCoder(KvCoder.of(new NullOnlyCoder<K>(), 
kvWriteTransform.getValueCoder()))
         .apply(kvWriteTransform);
     }
   }
 
+  private static class NullOnlyCoder<T> extends AtomicCoder<T> {
+    @Override
+    public void encode(T value, OutputStream outStream, Context context) {
+      checkArgument(value == null, "Can only encode nulls");
+      // Encode as the empty string.
+    }
+
+    @Override
+    public T decode(InputStream inStream, Context context) {
+      return null;
+    }
+  }
+
   private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
 
     @Setup
     public void setup() {
-      if (producerFactoryFnOpt.isPresent()) {
-        producer = producerFactoryFnOpt.get().apply(producerConfig);
+      if (spec.getProducerFactoryFn() != null) {
+        producer = spec.getProducerFactoryFn().apply(producerConfig);
       } else {
         producer = new KafkaProducer<K, V>(producerConfig);
       }
@@ -1347,7 +1274,7 @@ public class KafkaIO {
 
       KV<K, V> kv = ctx.element();
       producer.send(
-          new ProducerRecord<K, V>(topic, kv.getKey(), kv.getValue()),
+          new ProducerRecord<K, V>(spec.getTopic(), kv.getKey(), 
kv.getValue()),
           new SendCallback());
     }
 
@@ -1364,10 +1291,8 @@ public class KafkaIO {
 
     
///////////////////////////////////////////////////////////////////////////////////
 
-    private final String topic;
+    private final Write<K, V> spec;
     private final Map<String, Object> producerConfig;
-    private final Optional<SerializableFunction<Map<String, Object>, 
Producer<K, V>>>
-                  producerFactoryFnOpt;
 
     private transient Producer<K, V> producer = null;
     //private transient Callback sendCallback = new SendCallback();
@@ -1375,14 +1300,8 @@ public class KafkaIO {
     private transient Exception sendException = null;
     private transient long numSendFailures = 0;
 
-    KafkaWriter(String topic,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        Map<String, Object> producerConfig,
-        Optional<SerializableFunction<Map<String, Object>, Producer<K, V>>> 
producerFactoryFnOpt) {
-
-      this.topic = topic;
-      this.producerFactoryFnOpt = producerFactoryFnOpt;
+    KafkaWriter(Write<K, V> spec) {
+      this.spec = spec;
 
       // Set custom kafka serializers. We can not serialize user objects then 
pass the bytes to
       // producer. The key and value objects are used in kafka Partitioner 
interface.
@@ -1390,9 +1309,9 @@ public class KafkaIO {
       // key bytes to pick a partition. But are making sure user's custom 
partitioner would work
       // as expected.
 
-      this.producerConfig = new HashMap<>(producerConfig);
-      this.producerConfig.put(configForKeySerializer(), keyCoder);
-      this.producerConfig.put(configForValueSerializer(), valueCoder);
+      this.producerConfig = new HashMap<>(spec.getProducerConfig());
+      this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());
+      this.producerConfig.put(configForValueSerializer(), 
spec.getValueCoder());
     }
 
     private synchronized void checkForFailures() throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/e0c704f1/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index e18d628..5424b61 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
@@ -210,13 +211,13 @@ public class KafkaIOTest {
    * Creates a consumer with two topics, with 5 partitions each.
    * numElements are (round-robin) assigned all the 10 partitions.
    */
-  private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform(
+  private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       int numElements,
       @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
 
     List<String> topics = ImmutableList.of("topic_a", "topic_b");
 
-    KafkaIO.Read<Integer, Long> reader = KafkaIO.read()
+    KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
         .withBootstrapServers("none")
         .withTopics(topics)
         .withConsumerFactoryFn(new ConsumerFactoryFn(
@@ -289,11 +290,12 @@ public class KafkaIOTest {
 
     List<String> topics = ImmutableList.of("test");
 
-    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
+    KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read()
         .withBootstrapServers("none")
         .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 
partitions
+        .withKeyCoder(ByteArrayCoder.of())
         .withValueCoder(BigEndianLongCoder.of())
         .withMaxNumRecords(numElements / 10);
 
@@ -474,7 +476,7 @@ public class KafkaIOTest {
     int numElements = 100; // all the 20 partitions will have elements
     List<String> topics = ImmutableList.of("topic_a", "topic_b");
 
-    source = KafkaIO.read()
+    source = KafkaIO.<Integer, Long>read()
         .withBootstrapServers("none")
         .withTopics(topics)
         .withConsumerFactoryFn(new ConsumerFactoryFn(
@@ -520,7 +522,7 @@ public class KafkaIOTest {
       p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
-        .apply(KafkaIO.write()
+        .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
             .withKeyCoder(BigEndianIntegerCoder.of())
@@ -553,10 +555,9 @@ public class KafkaIOTest {
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
         .apply(Values.<Long>create()) // there are no keys
-        .apply(KafkaIO.write()
+        .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
-            .withKeyCoder(BigEndianIntegerCoder.of())
             .withValueCoder(BigEndianLongCoder.of())
             .withProducerFactoryFn(new ProducerFactoryFn())
             .values());
@@ -595,7 +596,7 @@ public class KafkaIOTest {
       p
         .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
             .withoutMetadata())
-        .apply(KafkaIO.write()
+        .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
             .withKeyCoder(BigEndianIntegerCoder.of())

Reply via email to