Jean-Baptiste Onofré created BEAM-2257:
------------------------------------------
Summary: KafkaIO write without key requires a producer fn
Key: BEAM-2257
URL: https://issues.apache.org/jira/browse/BEAM-2257
Project: Beam
Issue Type: Bug
Components: sdk-java-extensions
Reporter: Jean-Baptiste Onofré
Assignee: Jean-Baptiste Onofré
The {{KafkaIO}} javadoc says that it's possible to write directly {{String}} to
the topic without key:
{code}
PCollection<String> strings = ...;
strings.apply(KafkaIO.<Void, String>write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withValueSerializer(new StringSerializer()) // just need serializer for
value
.values()
);
{code}
This is not fully correct:
1. {{withValueSerializer()}} requires a class of serializer, not an instance.
So, it should be {{withValueSerializer(StringSerializer.class)}}.
2. As the key serializer is not provider, a kafka producer fn is required,
else, the user will get:
{code}
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka
producer
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
at
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
Caused by: java.lang.NullPointerException
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:300)
at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:156)
at
org.apache.beam.sdk.io.kafka.KafkaIO$KafkaWriter.setup(KafkaIO.java:1494)
{code}
A possible workaround is to create a {{VoidSerializer}} and pass it via
{{withKeySerializer()}} or provide the producer fn.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)