Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147688912
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
    @@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
                        this.context = context;
                }
        }
    +
    +   /**
    +    * Custom {@link TypeSerializer} for the sink state.
    +    */
    +   static final class StateSerializer<TXN, CONTEXT> extends 
TypeSerializer<State<TXN, CONTEXT>> {
    +
    +           private final TypeSerializer<TXN> transactionSerializer;
    +           private final TypeSerializer<CONTEXT> contextSerializer;
    +
    +           public StateSerializer(
    +                           TypeSerializer<TXN> transactionSerializer,
    +                           TypeSerializer<CONTEXT> contextSerializer) {
    +                   this.transactionSerializer = 
checkNotNull(transactionSerializer);
    +                   this.contextSerializer = 
checkNotNull(contextSerializer);
    +           }
    +
    +           @Override
    +           public boolean isImmutableType() {
    +                   return transactionSerializer.isImmutableType() && 
contextSerializer.isImmutableType();
    +           }
    +
    +           @Override
    +           public TypeSerializer<State<TXN, CONTEXT>> duplicate() {
    +                   return new StateSerializer<>(
    +                                   transactionSerializer.duplicate(), 
contextSerializer.duplicate());
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> createInstance() {
    +                   return null;
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
    +                   TXN copiedPendingTransaction = 
transactionSerializer.copy(from.getPendingTransaction());
    +                   List<TXN> copiedPendingCommitTransactions = new 
ArrayList<>();
    +                   for (TXN txn : from.getPendingCommitTransactions()) {
    +                           
copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
    +                   }
    +                   Optional<CONTEXT> copiedContext = 
from.getContext().map(contextSerializer::copy);
    +                   return new State<>(copiedPendingTransaction, 
copiedPendingCommitTransactions, copiedContext);
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> copy(
    +                           State<TXN, CONTEXT> from,
    +                           State<TXN, CONTEXT> reuse) {
    +                   return copy(from);
    +           }
    +
    +           @Override
    +           public int getLength() {
    +                   return -1;
    +           }
    +
    +           @Override
    +           public void serialize(
    +                           State<TXN, CONTEXT> record,
    +                           DataOutputView target) throws IOException {
    +                   
transactionSerializer.serialize(record.getPendingTransaction(), target);
    +                   List<TXN> pendingCommitTransactions = 
record.getPendingCommitTransactions();
    +                   target.writeInt(pendingCommitTransactions.size());
    +                   for (TXN pendingTxn : pendingCommitTransactions) {
    +                           transactionSerializer.serialize(pendingTxn, 
target);
    +                   }
    +                   Optional<CONTEXT> context = record.getContext();
    +                   if (context.isPresent()) {
    +                           target.writeBoolean(true);
    +                           contextSerializer.serialize(context.get(), 
target);
    +                   } else {
    +                           target.writeBoolean(false);
    +                   }
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> deserialize(DataInputView source) 
throws IOException {
    +                   TXN pendingTxn = 
transactionSerializer.deserialize(source);
    +                   int numPendingCommitTxns = source.readInt();
    +                   List<TXN> pendingCommitTxns = new 
ArrayList<>(numPendingCommitTxns);
    +                   for (int i = 0; i < numPendingCommitTxns; i++) {
    +                           
pendingCommitTxns.add(transactionSerializer.deserialize(source));
    +                   }
    +                   Optional<CONTEXT> context = Optional.empty();
    +                   boolean hasContext = source.readBoolean();
    +                   if (hasContext) {
    +                           context = 
Optional.of(contextSerializer.deserialize(source));
    +                   }
    +                   return new State<>(pendingTxn, pendingCommitTxns, 
context);
    +           }
    +
    +           @Override
    +           public State<TXN, CONTEXT> deserialize(
    +                           State<TXN, CONTEXT> reuse,
    +                           DataInputView source) throws IOException {
    +                   return deserialize(source);
    +           }
    +
    +           @Override
    +           public void copy(
    +                           DataInputView source, DataOutputView target) 
throws IOException {
    +                   TXN pendingTxn = 
transactionSerializer.deserialize(source);
    +                   transactionSerializer.serialize(pendingTxn, target);
    +                   int numPendingCommitTxns = source.readInt();
    +                   target.writeInt(numPendingCommitTxns);
    +                   for (int i = 0; i < numPendingCommitTxns; i++) {
    +                           TXN pendingCommitTxn = 
transactionSerializer.deserialize(source);
    +                           
transactionSerializer.serialize(pendingCommitTxn, target);
    +                   }
    +                   boolean hasContext = source.readBoolean();
    --- End diff --
    
    Here you should also write the `hasContext` to the `target`.


---

Reply via email to