This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d90e499 [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not
send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty
when enable EoS
d90e499 is described below
commit d90e49905ed6f2f9a519ac274345f08c867a999d
Author: Tony Wei <[email protected]>
AuthorDate: Thu Oct 31 17:58:43 2019 +0800
[FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send
`ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when
enable EoS
---
.../kafka/internal/FlinkKafkaProducer.java | 14 ++++--
.../connectors/kafka/FlinkKafkaProducerITCase.java | 56 ++++++++++++++++++++++
.../kafka/internal/FlinkKafkaInternalProducer.java | 14 ++++--
.../kafka/FlinkKafkaInternalProducerITCase.java | 56 ++++++++++++++++++++++
4 files changed, 134 insertions(+), 6 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9f00606..1e73139 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -306,9 +306,17 @@ public class FlinkKafkaProducer<K, V> implements
Producer<K, V> {
private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer,
"transactionManager");
synchronized (transactionManager) {
- Object txnRequestHandler = invoke(transactionManager,
"addPartitionsToTransactionHandler");
- invoke(transactionManager, "enqueueRequest", new
Class[]{txnRequestHandler.getClass().getSuperclass()}, new
Object[]{txnRequestHandler});
- TransactionalRequestResult result =
(TransactionalRequestResult) getValue(txnRequestHandler,
txnRequestHandler.getClass().getSuperclass(), "result");
+ Object newPartitionsInTransaction =
getValue(transactionManager, "newPartitionsInTransaction");
+ Object newPartitionsInTransactionIsEmpty =
invoke(newPartitionsInTransaction, "isEmpty");
+ TransactionalRequestResult result;
+ if (newPartitionsInTransactionIsEmpty instanceof
Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+ Object txnRequestHandler =
invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager, "enqueueRequest",
new Class[]{txnRequestHandler.getClass().getSuperclass()}, new
Object[]{txnRequestHandler});
+ result = (TransactionalRequestResult)
getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(),
"result");
+ } else {
+ result = new TransactionalRequestResult();
+ result.done();
+ }
return result;
}
}
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 6e13cc6a..b6de3c7 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -22,12 +22,14 @@ import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
+ @BeforeClass
+ public static void prepare() throws Exception {
+
LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaTestBase ");
+
LOG.info("-------------------------------------------------------------------------");
+
+ Properties serverProperties = new Properties();
+ serverProperties.put("transaction.state.log.num.partitions",
Integer.toString(1));
+ serverProperties.put("auto.leader.rebalance.enable",
Boolean.toString(false));
+ startClusters(KafkaTestEnvironment.createConfig()
+ .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+ .setSecureMode(false)
+ .setHideKafkaBehindProxy(true)
+ .setKafkaServerProperties(serverProperties));
+ }
+
@Before
public void before() {
transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
kafkaProducer.flush();
}
+ @Test(timeout = 30000L)
+ public void
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws
Exception {
+ String topic = "flink-kafka-producer-txn-coordinator-changed";
+ createTestTopic(topic, 1, 2);
+ try (Producer<String, String> kafkaProducer = new
FlinkKafkaProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+ kafkaProducer.flush();
+ kafkaProducer.commitTransaction();
+ }
+ deleteTestTopic(topic);
+ }
+
private FlinkKafkaProducer<String, String> getClosedProducer(String
topicName) {
FlinkKafkaProducer<String, String> kafkaProducer = new
FlinkKafkaProducer<>(extraProperties);
kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void restartBroker(int brokerId) {
+ KafkaServer toRestart = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toRestart = server;
+ }
+ }
+
+ if (toRestart == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker
to restart: " + brokerId
+ + " ; available brokers: " +
listOfBrokers.toString());
+ } else {
+ toRestart.shutdown();
+ toRestart.awaitShutdown();
+ toRestart.startup();
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 78bbb53..8c2246e 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -269,9 +269,17 @@ public class FlinkKafkaInternalProducer<K, V> implements
Producer<K, V> {
private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer,
"transactionManager");
synchronized (transactionManager) {
- Object txnRequestHandler = invoke(transactionManager,
"addPartitionsToTransactionHandler");
- invoke(transactionManager, "enqueueRequest", new
Class[]{txnRequestHandler.getClass().getSuperclass()}, new
Object[]{txnRequestHandler});
- TransactionalRequestResult result =
(TransactionalRequestResult) getValue(txnRequestHandler,
txnRequestHandler.getClass().getSuperclass(), "result");
+ Object newPartitionsInTransaction =
getValue(transactionManager, "newPartitionsInTransaction");
+ Object newPartitionsInTransactionIsEmpty =
invoke(newPartitionsInTransaction, "isEmpty");
+ TransactionalRequestResult result;
+ if (newPartitionsInTransactionIsEmpty instanceof
Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+ Object txnRequestHandler =
invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager, "enqueueRequest",
new Class[]{txnRequestHandler.getClass().getSuperclass()}, new
Object[]{txnRequestHandler});
+ result = (TransactionalRequestResult)
getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(),
"result");
+ } else {
+ result = new TransactionalRequestResult();
+ result.done();
+ }
return result;
}
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 2d749ba..5062b70 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -22,12 +22,14 @@ import
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalPr
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaInternalProducerITCase extends
KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
+ @BeforeClass
+ public static void prepare() throws Exception {
+
LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaTestBase ");
+
LOG.info("-------------------------------------------------------------------------");
+
+ Properties serverProperties = new Properties();
+ serverProperties.put("transaction.state.log.num.partitions",
Integer.toString(1));
+ serverProperties.put("auto.leader.rebalance.enable",
Boolean.toString(false));
+ startClusters(KafkaTestEnvironment.createConfig()
+ .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+ .setSecureMode(false)
+ .setHideKafkaBehindProxy(true)
+ .setKafkaServerProperties(serverProperties));
+ }
+
@Before
public void before() {
transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaInternalProducerITCase extends
KafkaTestBase {
kafkaProducer.flush();
}
+ @Test(timeout = 30000L)
+ public void
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws
Exception {
+ String topic = "flink-kafka-producer-txn-coordinator-changed";
+ createTestTopic(topic, 1, 2);
+ try (Producer<String, String> kafkaProducer = new
FlinkKafkaInternalProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+ kafkaProducer.flush();
+ kafkaProducer.commitTransaction();
+ }
+ deleteTestTopic(topic);
+ }
+
private FlinkKafkaInternalProducer<String, String>
getClosedProducer(String topicName) {
FlinkKafkaInternalProducer<String, String> kafkaProducer = new
FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaInternalProducerITCase extends
KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void restartBroker(int brokerId) {
+ KafkaServer toRestart = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toRestart = server;
+ }
+ }
+
+ if (toRestart == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+
listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker
to restart: " + brokerId
+ + " ; available brokers: " +
listOfBrokers.toString());
+ } else {
+ toRestart.shutdown();
+ toRestart.awaitShutdown();
+ toRestart.startup();
+ }
+ }
}