Repository: beam Updated Branches: refs/heads/master ffd87553f -> 66f249933
Fix NPE in Kafka value writer. KafkaIO.writer()...values() does not require user to set key coder since the key always null. Validation passes, but it results in an NPE at runtime when the writer is tries to instantiates the producer. Set key coder to 'NullOnlyCoder'. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0462f59 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0462f59 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0462f59 Branch: refs/heads/master Commit: d0462f59548ebed0dd7ae744b138ff956b742cad Parents: ffd8755 Author: Raghu Angadi <[email protected]> Authored: Wed Mar 29 23:21:54 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Mar 30 13:57:26 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 22 +++++++++----------- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 16 ++++++++++++++ 2 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7880cbc..bb7d971 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -254,7 +254,6 @@ public class KafkaIO { public static <K, V> Write<K, V> write() { return new AutoValue_KafkaIO_Write.Builder<K, V>() .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) - .setValueOnly(false) .build(); } @@ -1159,7 +1158,6 @@ public class KafkaIO { @Nullable abstract String getTopic(); @Nullable abstract Coder<K> getKeyCoder(); @Nullable abstract Coder<V> getValueCoder(); - abstract boolean getValueOnly(); abstract Map<String, Object> getProducerConfig(); @Nullable abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn(); @@ -1171,7 +1169,6 @@ public class KafkaIO { abstract Builder<K, V> setTopic(String topic); abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder); abstract Builder<K, V> setValueCoder(Coder<V> valueCoder); - abstract Builder<K, V> setValueOnly(boolean valueOnly); abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig); abstract Builder<K, V> setProducerFactoryFn( SerializableFunction<Map<String, Object>, Producer<K, V>> fn); @@ -1231,7 +1228,7 @@ public class KafkaIO { * collections of values rather thank {@link KV}s. */ public PTransform<PCollection<V>, PDone> values() { - return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); + return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build()); } @Override @@ -1245,9 +1242,7 @@ public class KafkaIO { checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkNotNull(getTopic(), "Kafka topic should be set"); - if (!getValueOnly()) { - checkNotNull(getKeyCoder(), "Key coder should be set"); - } + checkNotNull(getKeyCoder(), "Key coder should be set"); checkNotNull(getValueCoder(), "Value coder should be set"); } @@ -1255,11 +1250,12 @@ public class KafkaIO { private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.<String, Object>of( ProducerConfig.RETRIES_CONFIG, 3, + // See comment about custom serializers in KafkaWriter constructor. ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class); /** - * A set of properties that are not required or don't make sense for our consumer. + * A set of properties that are not required or don't make sense for our producer. */ private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead", @@ -1373,11 +1369,13 @@ public class KafkaIO { KafkaWriter(Write<K, V> spec) { this.spec = spec; - // Set custom kafka serializers. We can not serialize user objects then pass the bytes to - // producer. The key and value objects are used in kafka Partitioner interface. + // Set custom kafka serializers. We do not want to serialize user objects then pass the bytes + // to producer since key and value objects are used in Kafka Partitioner interface. // This does not matter for default partitioner in Kafka as it uses just the serialized - // key bytes to pick a partition. But are making sure user's custom partitioner would work - // as expected. + // key bytes to pick a partition. But we don't want to limit use of custom partitions. + // We pass key and values objects the user writes directly Kafka and user supplied + // coders to serialize them are invoked inside CoderBasedKafkaSerializer. + // Use case : write all the events for a single session to same Kafka partition. this.producerConfig = new HashMap<>(spec.getProducerConfig()); this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder()); http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 1897127..d1696d0 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -71,9 +71,12 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; import org.junit.Rule; @@ -728,8 +731,21 @@ public class KafkaIOTest { private static class ProducerFactoryFn implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> { + @SuppressWarnings("unchecked") @Override public Producer<Integer, Long> apply(Map<String, Object> config) { + + // Make sure the config is correctly set up for serializers. + Utils.newInstance( + ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .asSubclass(Serializer.class) + ).configure(config, true); + + Utils.newInstance( + ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + .asSubclass(Serializer.class) + ).configure(config, false); + return MOCK_PRODUCER; } }
