Repository: beam
Updated Branches:
  refs/heads/master ffd87553f -> 66f249933


Fix NPE in Kafka value writer.

KafkaIO.writer()...values() does not require user to set key coder since the 
key always null.
Validation passes, but it results in an NPE at runtime when the writer is
tries to instantiates the producer. Set key coder to 'NullOnlyCoder'.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0462f59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0462f59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0462f59

Branch: refs/heads/master
Commit: d0462f59548ebed0dd7ae744b138ff956b742cad
Parents: ffd8755
Author: Raghu Angadi <[email protected]>
Authored: Wed Mar 29 23:21:54 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Thu Mar 30 13:57:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 22 +++++++++-----------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 16 ++++++++++++++
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 7880cbc..bb7d971 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -254,7 +254,6 @@ public class KafkaIO {
   public static <K, V> Write<K, V> write() {
     return new AutoValue_KafkaIO_Write.Builder<K, V>()
         .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
-        .setValueOnly(false)
         .build();
   }
 
@@ -1159,7 +1158,6 @@ public class KafkaIO {
     @Nullable abstract String getTopic();
     @Nullable abstract Coder<K> getKeyCoder();
     @Nullable abstract Coder<V> getValueCoder();
-    abstract boolean getValueOnly();
     abstract Map<String, Object> getProducerConfig();
     @Nullable
     abstract SerializableFunction<Map<String, Object>, Producer<K, V>> 
getProducerFactoryFn();
@@ -1171,7 +1169,6 @@ public class KafkaIO {
       abstract Builder<K, V> setTopic(String topic);
       abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
       abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
-      abstract Builder<K, V> setValueOnly(boolean valueOnly);
       abstract Builder<K, V> setProducerConfig(Map<String, Object> 
producerConfig);
       abstract Builder<K, V> setProducerFactoryFn(
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
@@ -1231,7 +1228,7 @@ public class KafkaIO {
      * collections of values rather thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
+      return new KafkaValueWrite<>(withKeyCoder(new 
NullOnlyCoder<K>()).toBuilder().build());
     }
 
     @Override
@@ -1245,9 +1242,7 @@ public class KafkaIO {
       
checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");
-      if (!getValueOnly()) {
-        checkNotNull(getKeyCoder(), "Key coder should be set");
-      }
+      checkNotNull(getKeyCoder(), "Key coder should be set");
       checkNotNull(getValueCoder(), "Value coder should be set");
     }
 
@@ -1255,11 +1250,12 @@ public class KafkaIO {
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(
             ProducerConfig.RETRIES_CONFIG, 3,
+            // See comment about custom serializers in KafkaWriter constructor.
             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
CoderBasedKafkaSerializer.class,
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
CoderBasedKafkaSerializer.class);
 
     /**
-     * A set of properties that are not required or don't make sense for our 
consumer.
+     * A set of properties that are not required or don't make sense for our 
producer.
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = 
ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
@@ -1373,11 +1369,13 @@ public class KafkaIO {
     KafkaWriter(Write<K, V> spec) {
       this.spec = spec;
 
-      // Set custom kafka serializers. We can not serialize user objects then 
pass the bytes to
-      // producer. The key and value objects are used in kafka Partitioner 
interface.
+      // Set custom kafka serializers. We do not want to serialize user 
objects then pass the bytes
+      // to producer since key and value objects are used in Kafka Partitioner 
interface.
       // This does not matter for default partitioner in Kafka as it uses just 
the serialized
-      // key bytes to pick a partition. But are making sure user's custom 
partitioner would work
-      // as expected.
+      // key bytes to pick a partition. But we don't want to limit use of 
custom partitions.
+      // We pass key and values objects the user writes directly Kafka and 
user supplied
+      // coders to serialize them are invoked inside CoderBasedKafkaSerializer.
+      // Use case : write all the events for a single session to same Kafka 
partition.
 
       this.producerConfig = new HashMap<>(spec.getProducerConfig());
       this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());

http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1897127..d1696d0 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -71,9 +71,12 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -728,8 +731,21 @@ public class KafkaIOTest {
   private static class ProducerFactoryFn
     implements SerializableFunction<Map<String, Object>, Producer<Integer, 
Long>> {
 
+    @SuppressWarnings("unchecked")
     @Override
     public Producer<Integer, Long> apply(Map<String, Object> config) {
+
+      // Make sure the config is correctly set up for serializers.
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, true);
+
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, false);
+
       return MOCK_PRODUCER;
     }
   }

Reply via email to