[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224836#comment-16224836
 ] 

ASF GitHub Bot commented on FLINK-7902:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147672995
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
    @@ -958,29 +960,34 @@ public int compare(PartitionInfo o1, PartitionInfo 
o2) {
        /**
         * State for handling transactions.
         */
    -   public static class KafkaTransactionState {
    +   static class KafkaTransactionState {
     
                private final transient FlinkKafkaProducer<byte[], byte[]> 
producer;
     
                @Nullable
    -           public final String transactionalId;
    +           final String transactionalId;
     
    -           public final long producerId;
    +           final long producerId;
     
    -           public final short epoch;
    +           final short epoch;
     
    -           public KafkaTransactionState(String transactionalId, 
FlinkKafkaProducer<byte[], byte[]> producer) {
    --- End diff --
    
    The `public` is not needed as the whole class is not public.


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> ------------------------------------------------------------
>
>                 Key: FLINK-7902
>                 URL: https://issues.apache.org/jira/browse/FLINK-7902
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType<KafkaTransactionState>, 
> GenericType<KafkaTransactionContext>)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to