Aljoscha Krettek created FLINK-7902: ---------------------------------------
Summary: TwoPhaseCommitSinkFunctions should use custom TypeSerializer Key: FLINK-7902 URL: https://issues.apache.org/jira/browse/FLINK-7902 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.0 Reporter: Aljoscha Krettek Assignee: Piotr Nowojski Priority: Blocker Fix For: 1.4.0 Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to create a {{TypeInformation}} which in turn is used to create a {{StateDescriptor}} for the state that the Kafka sink stores. Behind the scenes, this would be roughly analysed as a {{PojoType(GenericType<KafkaTransactionState>, GenericType<KafkaTransactionContext>)}} which means we don't have explicit control over the serialisation format and we also use Kryo (which is the default for {{GenericTypeInfo}}). This can be problematic if we want to evolve the state schema in the future or if we want to change Kryo versions. We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: {code} public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> stateSerializer) { {code} and we should then change the {{FlinkKafkaProducer011}} to hand in a custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)