Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4919#discussion_r147676539
--- Diff:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
---
@@ -993,14 +1000,162 @@ public String toString() {
* Context associated to this instance of the {@link
FlinkKafkaProducer011}. User for keeping track of the
* transactionalIds.
*/
- public static class KafkaTransactionContext {
- public final Set<String> transactionalIds;
+ static class KafkaTransactionContext {
+ final Set<String> transactionalIds;
- public KafkaTransactionContext(Set<String> transactionalIds) {
+ KafkaTransactionContext(Set<String> transactionalIds) {
this.transactionalIds = transactionalIds;
}
}
+ static class TransactionStateSerializer extends
TypeSerializerSingleton<KafkaTransactionState> {
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public KafkaTransactionState createInstance() {
+ return null;
+ }
+
+ @Override
+ public KafkaTransactionState copy(KafkaTransactionState from) {
+ return from;
+ }
+
+ @Override
+ public KafkaTransactionState copy(
+ KafkaTransactionState from,
+ KafkaTransactionState reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ KafkaTransactionState record,
+ DataOutputView target) throws IOException {
+ if (record.transactionalId == null) {
+ target.writeBoolean(false);
+ } else {
+ target.writeBoolean(true);
+ target.writeUTF(record.transactionalId);
+ }
+ target.writeLong(record.producerId);
+ target.writeShort(record.epoch);
+ }
+
+ @Override
+ public KafkaTransactionState deserialize(DataInputView source)
throws IOException {
+ String transactionalId = null;
+ if (source.readBoolean()) {
+ transactionalId = source.readUTF();
+ }
+ long producerId = source.readLong();
+ short epoch = source.readShort();
+ return new KafkaTransactionState(transactionalId,
producerId, epoch, null);
+ }
+
+ @Override
+ public KafkaTransactionState deserialize(
+ KafkaTransactionState reuse,
+ DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(
+ DataInputView source, DataOutputView target)
throws IOException {
+ boolean hasTransactionalId = source.readBoolean();
+ target.writeBoolean(hasTransactionalId);
+ target.writeUTF(source.readUTF());
+ target.writeLong(source.readLong());
+ target.writeShort(source.readShort());
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TransactionStateSerializer;
+ }
+ }
+
+ static class ContextStateSerializer extends
TypeSerializerSingleton<KafkaTransactionContext> {
--- End diff --
Define a `serialVersionUID`.
---