Hi,

TypeInformationKeyValueSerializationSchema uses Flink TypeSerializers to 
serialize those Strings. What you're seeing is the binary representation of 
those Strings which, admittedly, resembles the original Strings. If you want to 
have the data in Kafka with real Strings, I think you need to have a custom 
KeyedSerializationSchema for your type.

Best,
Aljoscha

> On 27. Aug 2017, at 15:59, Alexx <alex.guta...@gmail.com> wrote:
> 
> My stream is producing records of type *Tuple2<String,String>*
> *.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> where the key is *usr12345* and value is
> *{"_key":"usr12345","_temperature":46.6}*
> The *.print()* on the stream outputs the value correctly:
> *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> But when I write the stream to Kafka the key becomes *" usr12345"* and the
> value *"({"_key":"usr12345","_temperature":46.6}"*
> Notice the space at the beginning of the key and the left parenthesis at the
> beginning of the value.
> Very strange. Why this might happen?
> 
> Here is the serialization code:
> 
> *TypeInformation<String> resultType = TypeInformation.of(String.class);
> 
> KeyedSerializationSchema<Tuple2&lt;String, String>> schema =
>      new TypeInformationKeyValueSerializationSchema<>(resultType,
> resultType, env.getConfig());
> 
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
> flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
>      stream,   
>      "topic",    
>      schema,  
>      kafkaProducerProperties);*
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19401.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to