[ https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224836#comment-16224836 ]
ASF GitHub Bot commented on FLINK-7902: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147672995 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -958,29 +960,34 @@ public int compare(PartitionInfo o1, PartitionInfo o2) { /** * State for handling transactions. */ - public static class KafkaTransactionState { + static class KafkaTransactionState { private final transient FlinkKafkaProducer<byte[], byte[]> producer; @Nullable - public final String transactionalId; + final String transactionalId; - public final long producerId; + final long producerId; - public final short epoch; + final short epoch; - public KafkaTransactionState(String transactionalId, FlinkKafkaProducer<byte[], byte[]> producer) { --- End diff -- The `public` is not needed as the whole class is not public. > TwoPhaseCommitSinkFunctions should use custom TypeSerializer > ------------------------------------------------------------ > > Key: FLINK-7902 > URL: https://issues.apache.org/jira/browse/FLINK-7902 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.4.0 > > > Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new > TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to > create a {{TypeInformation}} which in turn is used to create a > {{StateDescriptor}} for the state that the Kafka sink stores. > Behind the scenes, this would be roughly analysed as a > {{PojoType(GenericType<KafkaTransactionState>, > GenericType<KafkaTransactionContext>)}} which means we don't have explicit > control over the serialisation format and we also use Kryo (which is the > default for {{GenericTypeInfo}}). This can be problematic if we want to > evolve the state schema in the future or if we want to change Kryo versions. > We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor: > {code} > public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> > stateSerializer) { > {code} > and we should then change the {{FlinkKafkaProducer011}} to hand in a > custom-made {{TypeSerializer}} for the state. -- This message was sent by Atlassian JIRA (v6.4.14#64029)