[ 
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224863#comment-16224863
 ] 

ASF GitHub Bot commented on FLINK-7902:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147692117
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
    @@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
                        this.context = context;
                }
        }
    +
    +   /**
    +    * Custom {@link TypeSerializer} for the sink state.
    +    */
    +   static final class StateSerializer<TXN, CONTEXT> extends 
TypeSerializer<State<TXN, CONTEXT>> {
    +
    +           private final TypeSerializer<TXN> transactionSerializer;
    +           private final TypeSerializer<CONTEXT> contextSerializer;
    +
    +           public StateSerializer(
    +                           TypeSerializer<TXN> transactionSerializer,
    +                           TypeSerializer<CONTEXT> contextSerializer) {
    +                   this.transactionSerializer = 
checkNotNull(transactionSerializer);
    +                   this.contextSerializer = 
checkNotNull(contextSerializer);
    +           }
    +
    +           @Override
    +           public boolean isImmutableType() {
    +                   return transactionSerializer.isImmutableType() && 
contextSerializer.isImmutableType();
    +           }
    +
    +           @Override
    +           public TypeSerializer<State<TXN, CONTEXT>> duplicate() {
    +                   return new StateSerializer<>(
    +                                   transactionSerializer.duplicate(), 
contextSerializer.duplicate());
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> createInstance() {
    +                   return null;
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
    +                   TXN copiedPendingTransaction = 
transactionSerializer.copy(from.getPendingTransaction());
    +                   List<TXN> copiedPendingCommitTransactions = new 
ArrayList<>();
    +                   for (TXN txn : from.getPendingCommitTransactions()) {
    +                           
copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
    +                   }
    +                   Optional<CONTEXT> copiedContext = 
from.getContext().map(contextSerializer::copy);
    +                   return new State<>(copiedPendingTransaction, 
copiedPendingCommitTransactions, copiedContext);
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> copy(
    +                           State<TXN, CONTEXT> from,
    +                           State<TXN, CONTEXT> reuse) {
    +                   return copy(from);
    +           }
    +
    +           @Override
    +           public int getLength() {
    +                   return -1;
    +           }
    +
    +           @Override
    +           public void serialize(
    +                           State<TXN, CONTEXT> record,
    +                           DataOutputView target) throws IOException {
    +                   
transactionSerializer.serialize(record.getPendingTransaction(), target);
    +                   List<TXN> pendingCommitTransactions = 
record.getPendingCommitTransactions();
    +                   target.writeInt(pendingCommitTransactions.size());
    +                   for (TXN pendingTxn : pendingCommitTransactions) {
    +                           transactionSerializer.serialize(pendingTxn, 
target);
    +                   }
    +                   Optional<CONTEXT> context = record.getContext();
    +                   if (context.isPresent()) {
    +                           target.writeBoolean(true);
    +                           contextSerializer.serialize(context.get(), 
target);
    +                   } else {
    +                           target.writeBoolean(false);
    +                   }
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> deserialize(DataInputView source) 
throws IOException {
    +                   TXN pendingTxn = 
transactionSerializer.deserialize(source);
    +                   int numPendingCommitTxns = source.readInt();
    +                   List<TXN> pendingCommitTxns = new 
ArrayList<>(numPendingCommitTxns);
    +                   for (int i = 0; i < numPendingCommitTxns; i++) {
    +                           
pendingCommitTxns.add(transactionSerializer.deserialize(source));
    +                   }
    +                   Optional<CONTEXT> context = Optional.empty();
    +                   boolean hasContext = source.readBoolean();
    +                   if (hasContext) {
    +                           context = 
Optional.of(contextSerializer.deserialize(source));
    +                   }
    +                   return new State<>(pendingTxn, pendingCommitTxns, 
context);
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> deserialize(
    +                           State<TXN, CONTEXT> reuse,
    +                           DataInputView source) throws IOException {
    +                   return deserialize(source);
    +           }
    +
    +           @Override
    +           public void copy(
    +                           DataInputView source, DataOutputView target) 
throws IOException {
    +                   TXN pendingTxn = 
transactionSerializer.deserialize(source);
    +                   transactionSerializer.serialize(pendingTxn, target);
    +                   int numPendingCommitTxns = source.readInt();
    +                   target.writeInt(numPendingCommitTxns);
    +                   for (int i = 0; i < numPendingCommitTxns; i++) {
    +                           TXN pendingCommitTxn = 
transactionSerializer.deserialize(source);
    +                           
transactionSerializer.serialize(pendingCommitTxn, target);
    +                   }
    +                   boolean hasContext = source.readBoolean();
    --- End diff --
    
    fixing


> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> ------------------------------------------------------------
>
>                 Key: FLINK-7902
>                 URL: https://issues.apache.org/jira/browse/FLINK-7902
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new 
> TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to 
> create a {{TypeInformation}} which in turn is used to create a 
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a 
> {{PojoType(GenericType<KafkaTransactionState>, 
> GenericType<KafkaTransactionContext>)}} which means we don't have explicit 
> control over the serialisation format and we also use Kryo (which is the 
> default for {{GenericTypeInfo}}). This can be problematic if we want to 
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> 
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a 
> custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to