[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224842#comment-16224842 ]
ASF GitHub Bot commented on FLINK-7902: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147673504 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -993,14 +1000,162 @@ public String toString() { * Context associated to this instance of the {@link FlinkKafkaProducer011}. User for keeping track of the * transactionalIds. */ - public static class KafkaTransactionContext { - public final Set<String> transactionalIds; + static class KafkaTransactionContext { + final Set<String> transactionalIds; - public KafkaTransactionContext(Set<String> transactionalIds) { + KafkaTransactionContext(Set<String> transactionalIds) { this.transactionalIds = transactionalIds; } } + static class TransactionStateSerializer extends TypeSerializerSingleton<KafkaTransactionState> { --- End diff -- Define a `serialVersionUID`. > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > ------------------------------------------------------------ > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType<KafkaTransactionState>, > GenericType<KafkaTransactionContext>)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)