Akshay Nagpal created FLINK-10039:
-------------------------------------
Summary: FlinkKafkaProducer - Serializer Error
Key: FLINK-10039
URL: https://issues.apache.org/jira/browse/FLINK-10039
Project: Flink
Issue Type: Bug
Components: Streaming Connectors
Affects Versions: 1.4.2
Reporter: Akshay Nagpal
I am working on a use case where I input the data using Kafka's console
producer, read the same data in my program using FlinkKafkaConsumer and write
it back to another Kafka topic using FlinkKafkaProducer.
I am using 1.4.2 version of the following dependencies:
flink-java
flink-streaming-java_2.11
flink-connector-kafka-0.10_2.11
The codes are as follows:
KafkaConsoleProducer:
{code:java}
./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property
"parse.key=true" --property "key.separator=:" --key-serializer
org.apache.kafka.common.serialization.StringSerializer --value-serializer
org.apache.kafka.common.serialization.StringSerializer
{code}
KafkaFlinkConsumer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer010<String> myConsumer = new
FlinkKafkaConsumer010<String>("test1",
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
{code}
KafkaFlinkProducer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties1.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
FlinkKafkaProducer010<String> myProducer = new
FlinkKafkaProducer010<String>("my-topic",
new SimpleStringSchema(),
properties);
stream.addSink(myProducer);
{code}
When I specify key and value serializer as StringSerializer in
FlinkKafkaProducer, it gives me the following error in the logs:
{code:java}
org.apache.kafka.common.errors.SerializationException: Can't convert value of
class [B to class org.apache.kafka.common.serialization.StringSerializer
specified in value.serializer
{code}
Though it's giving me this error, it's still producing the data in the topic.
When I am using ByteArraySerializer though with the producer, it is not giving
me the error in the logs. It is also giving me the output.
Moreover, DataStream's print method is not printing the data on the console.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)