Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4919#discussion_r147676539 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -993,14 +1000,162 @@ public String toString() { * Context associated to this instance of the {@link FlinkKafkaProducer011}. User for keeping track of the * transactionalIds. */ - public static class KafkaTransactionContext { - public final Set<String> transactionalIds; + static class KafkaTransactionContext { + final Set<String> transactionalIds; - public KafkaTransactionContext(Set<String> transactionalIds) { + KafkaTransactionContext(Set<String> transactionalIds) { this.transactionalIds = transactionalIds; } } + static class TransactionStateSerializer extends TypeSerializerSingleton<KafkaTransactionState> { + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public KafkaTransactionState createInstance() { + return null; + } + + @Override + public KafkaTransactionState copy(KafkaTransactionState from) { + return from; + } + + @Override + public KafkaTransactionState copy( + KafkaTransactionState from, + KafkaTransactionState reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize( + KafkaTransactionState record, + DataOutputView target) throws IOException { + if (record.transactionalId == null) { + target.writeBoolean(false); + } else { + target.writeBoolean(true); + target.writeUTF(record.transactionalId); + } + target.writeLong(record.producerId); + target.writeShort(record.epoch); + } + + @Override + public KafkaTransactionState deserialize(DataInputView source) throws IOException { + String transactionalId = null; + if (source.readBoolean()) { + transactionalId = source.readUTF(); + } + long producerId = source.readLong(); + short epoch = source.readShort(); + return new KafkaTransactionState(transactionalId, producerId, epoch, null); + } + + @Override + public KafkaTransactionState deserialize( + KafkaTransactionState reuse, + DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy( + DataInputView source, DataOutputView target) throws IOException { + boolean hasTransactionalId = source.readBoolean(); + target.writeBoolean(hasTransactionalId); + target.writeUTF(source.readUTF()); + target.writeLong(source.readLong()); + target.writeShort(source.readShort()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TransactionStateSerializer; + } + } + + static class ContextStateSerializer extends TypeSerializerSingleton<KafkaTransactionContext> { --- End diff -- Define a `serialVersionUID`.
---