My stream is producing records of type Tuple2<String,String>

*.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*

where the key is *usr12345* and value is
*{"_key":"usr12345","_temperature":46.6}*

The *.print()* on the stream outputs the value correctly:
*(usr12345,{"_key":"usr12345","_temperature":46.6})*

But when I write the stream to Kafka the key becomes " *usr12345" *and
the value "*(**{"_key":"usr12345","_temperature":46.6}"*

Notice the space at the beginning of the key and the left parenthesis
at the beginning of the value.

Very strange. Why this might happen?


Here is the serialization code:

TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2<String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType,
resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
flinkKafkaProducerConfig =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,
      "topic",
      schema,
      kafkaProducerProperties);

Reply via email to