This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new e13ae99 [FLINK-23509][connectors/kafka] Create new ProducerIdAndEpoch
when resuming Kafka transaction
e13ae99 is described below
commit e13ae99953d2702f45db9a42d8714b6da93cd0ff
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Aug 3 15:26:11 2021 +0200
[FLINK-23509][connectors/kafka] Create new ProducerIdAndEpoch when resuming
Kafka transaction
Before this change we did not create a new ProducerIdAndEpoch and rather
mutated the already created one. This can overwrite the internally
used static ProducerIdAndEpoch#None which is used to describe a non
initialized state. Once the static object is mutated it breaks the
recovery of transactions.
---
.../internals/FlinkKafkaInternalProducer.java | 25 +++++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index 2f6c512..c0fd2ec 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -226,9 +228,10 @@ public class FlinkKafkaInternalProducer<K, V> implements
Producer<K, V> {
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
invoke(topicPartitionBookkeeper, "reset");
- Object producerIdAndEpoch = getField(transactionManager,
"producerIdAndEpoch");
- setField(producerIdAndEpoch, "producerId", producerId);
- setField(producerIdAndEpoch, "epoch", epoch);
+ setField(
+ transactionManager,
+ "producerIdAndEpoch",
+ createProducerIdAndEpoch(producerId, epoch));
invoke(
transactionManager,
@@ -283,6 +286,22 @@ public class FlinkKafkaInternalProducer<K, V> implements
Producer<K, V> {
}
}
+ private Object createProducerIdAndEpoch(long producerId, short epoch) {
+ try {
+ Field field =
TransactionManager.class.getDeclaredField("producerIdAndEpoch");
+ Class<?> clazz = field.getType();
+ Constructor<?> constructor =
clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
+ constructor.setAccessible(true);
+ return constructor.newInstance(producerId, epoch);
+ } catch (InvocationTargetException
+ | InstantiationException
+ | IllegalAccessException
+ | NoSuchFieldException
+ | NoSuchMethodException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version",
e);
+ }
+ }
+
/**
* Besides committing {@link
org.apache.kafka.clients.producer.KafkaProducer#commitTransaction}
* is also adding new partitions to the transaction. flushNewPartitions
method is moving this