This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new e13ae99  [FLINK-23509][connectors/kafka] Create new ProducerIdAndEpoch 
when resuming Kafka transaction
e13ae99 is described below

commit e13ae99953d2702f45db9a42d8714b6da93cd0ff
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Aug 3 15:26:11 2021 +0200

    [FLINK-23509][connectors/kafka] Create new ProducerIdAndEpoch when resuming 
Kafka transaction
    
    Before this change we did not create a new ProducerIdAndEpoch and rather
    mutated the already created one. This can overwrite the internally
    used static ProducerIdAndEpoch#None which is used to describe a non
    initialized state. Once the static object is mutated it breaks the
    recovery of transactions.
---
 .../internals/FlinkKafkaInternalProducer.java      | 25 +++++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index 2f6c512..c0fd2ec 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -226,9 +228,10 @@ public class FlinkKafkaInternalProducer<K, V> implements 
Producer<K, V> {
                                 
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
                 invoke(topicPartitionBookkeeper, "reset");
 
-                Object producerIdAndEpoch = getField(transactionManager, 
"producerIdAndEpoch");
-                setField(producerIdAndEpoch, "producerId", producerId);
-                setField(producerIdAndEpoch, "epoch", epoch);
+                setField(
+                        transactionManager,
+                        "producerIdAndEpoch",
+                        createProducerIdAndEpoch(producerId, epoch));
 
                 invoke(
                         transactionManager,
@@ -283,6 +286,22 @@ public class FlinkKafkaInternalProducer<K, V> implements 
Producer<K, V> {
         }
     }
 
+    private Object createProducerIdAndEpoch(long producerId, short epoch) {
+        try {
+            Field field = 
TransactionManager.class.getDeclaredField("producerIdAndEpoch");
+            Class<?> clazz = field.getType();
+            Constructor<?> constructor = 
clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
+            constructor.setAccessible(true);
+            return constructor.newInstance(producerId, epoch);
+        } catch (InvocationTargetException
+                | InstantiationException
+                | IllegalAccessException
+                | NoSuchFieldException
+                | NoSuchMethodException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", 
e);
+        }
+    }
+
     /**
      * Besides committing {@link 
org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
      * is also adding new partitions to the transaction. flushNewPartitions 
method is moving this

Reply via email to