Hi, TypeInformationKeyValueSerializationSchema uses Flink TypeSerializers to serialize those Strings. What you're seeing is the binary representation of those Strings which, admittedly, resembles the original Strings. If you want to have the data in Kafka with real Strings, I think you need to have a custom KeyedSerializationSchema for your type.
Best, Aljoscha > On 27. Aug 2017, at 15:59, Alexx <alex.guta...@gmail.com> wrote: > > My stream is producing records of type *Tuple2<String,String>* > *.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})* > where the key is *usr12345* and value is > *{"_key":"usr12345","_temperature":46.6}* > The *.print()* on the stream outputs the value correctly: > *(usr12345,{"_key":"usr12345","_temperature":46.6})* > But when I write the stream to Kafka the key becomes *" usr12345"* and the > value *"({"_key":"usr12345","_temperature":46.6}"* > Notice the space at the beginning of the key and the left parenthesis at the > beginning of the value. > Very strange. Why this might happen? > > Here is the serialization code: > > *TypeInformation<String> resultType = TypeInformation.of(String.class); > > KeyedSerializationSchema<Tuple2<String, String>> schema = > new TypeInformationKeyValueSerializationSchema<>(resultType, > resultType, env.getConfig()); > > FlinkKafkaProducer010.FlinkKafkaProducer010Configuration > flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps( > stream, > "topic", > schema, > kafkaProducerProperties);* > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19401.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com.