Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4919#discussion_r147676219
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
    @@ -993,14 +1000,162 @@ public String toString() {
         * Context associated to this instance of the {@link 
FlinkKafkaProducer011}. User for keeping track of the
         * transactionalIds.
         */
    -   public static class KafkaTransactionContext {
    -           public final Set<String> transactionalIds;
    +   static class KafkaTransactionContext {
    +           final Set<String> transactionalIds;
     
    -           public KafkaTransactionContext(Set<String> transactionalIds) {
    +           KafkaTransactionContext(Set<String> transactionalIds) {
                        this.transactionalIds = transactionalIds;
                }
        }
     
    +   static class TransactionStateSerializer extends 
TypeSerializerSingleton<KafkaTransactionState> {
    +           @Override
    +           public boolean isImmutableType() {
    +                   return true;
    +           }
    +
    +           @Override
    +           public KafkaTransactionState createInstance() {
    +                   return null;
    +           }
    +
    +           @Override
    +           public KafkaTransactionState copy(KafkaTransactionState from) {
    +                   return from;
    +           }
    +
    +           @Override
    +           public KafkaTransactionState copy(
    +                   KafkaTransactionState from,
    +                   KafkaTransactionState reuse) {
    +                   return from;
    +           }
    +
    +           @Override
    +           public int getLength() {
    +                   return -1;
    +           }
    +
    +           @Override
    +           public void serialize(
    +                           KafkaTransactionState record,
    +                           DataOutputView target) throws IOException {
    +                   if (record.transactionalId == null) {
    +                           target.writeBoolean(false);
    +                   } else {
    +                           target.writeBoolean(true);
    +                           target.writeUTF(record.transactionalId);
    +                   }
    +                   target.writeLong(record.producerId);
    +                   target.writeShort(record.epoch);
    +           }
    +
    +           @Override
    +           public KafkaTransactionState deserialize(DataInputView source) 
throws IOException {
    +                   String transactionalId = null;
    +                   if (source.readBoolean()) {
    +                           transactionalId = source.readUTF();
    +                   }
    +                   long producerId = source.readLong();
    +                   short epoch = source.readShort();
    +                   return new KafkaTransactionState(transactionalId, 
producerId, epoch, null);
    +           }
    +
    +           @Override
    +           public KafkaTransactionState deserialize(
    +                           KafkaTransactionState reuse,
    +                           DataInputView source) throws IOException {
    +                   return deserialize(source);
    +           }
    +
    +           @Override
    +           public void copy(
    +                           DataInputView source, DataOutputView target) 
throws IOException {
    +                   boolean hasTransactionalId = source.readBoolean();
    +                   target.writeBoolean(hasTransactionalId);
    +                   target.writeUTF(source.readUTF());
    +                   target.writeLong(source.readLong());
    +                   target.writeShort(source.readShort());
    +           }
    +
    +           @Override
    +           public boolean canEqual(Object obj) {
    +                   return obj instanceof TransactionStateSerializer;
    +           }
    +   }
    +
    +   static class ContextStateSerializer extends 
TypeSerializerSingleton<KafkaTransactionContext> {
    +           @Override
    +           public boolean isImmutableType() {
    +                   return true;
    +           }
    +
    +           @Override
    +           public KafkaTransactionContext createInstance() {
    +                   return null;
    +           }
    +
    +           @Override
    +           public KafkaTransactionContext copy(KafkaTransactionContext 
from) {
    +                   return from;
    +           }
    +
    +           @Override
    +           public KafkaTransactionContext copy(
    +                           KafkaTransactionContext from,
    +                           KafkaTransactionContext reuse) {
    +                   return from;
    +           }
    +
    +           @Override
    +           public int getLength() {
    +                   return -1;
    +           }
    +
    +           @Override
    +           public void serialize(
    +                           KafkaTransactionContext record,
    +                           DataOutputView target) throws IOException {
    +                   int numIds = record.transactionalIds.size();
    +                   target.writeInt(numIds);
    +                   for (String id : record.transactionalIds) {
    +                           target.writeUTF(id);
    +                   }
    +           }
    +
    +           @Override
    +           public KafkaTransactionContext deserialize(DataInputView 
source) throws IOException {
    +                   int numIds = source.readInt();
    +                   Set<String> ids = new HashSet<>(numIds);
    +                   for (int i = 0; i < numIds; i++) {
    +                           ids.add(source.readUTF());
    +                   }
    +                   return new KafkaTransactionContext(ids);
    +           }
    +
    +           @Override
    +           public KafkaTransactionContext deserialize(
    +                           KafkaTransactionContext reuse,
    +                           DataInputView source) throws IOException {
    +                   return deserialize(source);
    +           }
    +
    +           @Override
    +           public void copy(
    +                           DataInputView source,
    +                           DataOutputView target) throws IOException {
    +                   int numIds = source.readInt();
    --- End diff --
    
    You do not write the size of the list to the copy. Just add 
`target.writeInt(numIds);`.


---

Reply via email to