This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d90e499  [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not 
send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty 
when enable EoS
d90e499 is described below

commit d90e49905ed6f2f9a519ac274345f08c867a999d
Author: Tony Wei <[email protected]>
AuthorDate: Thu Oct 31 17:58:43 2019 +0800

    [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send 
`ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when 
enable EoS
---
 .../kafka/internal/FlinkKafkaProducer.java         | 14 ++++--
 .../connectors/kafka/FlinkKafkaProducerITCase.java | 56 ++++++++++++++++++++++
 .../kafka/internal/FlinkKafkaInternalProducer.java | 14 ++++--
 .../kafka/FlinkKafkaInternalProducerITCase.java    | 56 ++++++++++++++++++++++
 4 files changed, 134 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9f00606..1e73139 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -306,9 +306,17 @@ public class FlinkKafkaProducer<K, V> implements 
Producer<K, V> {
        private TransactionalRequestResult enqueueNewPartitions() {
                Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
                synchronized (transactionManager) {
-                       Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-                       invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-                       TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+                       Object newPartitionsInTransaction = 
getValue(transactionManager, "newPartitionsInTransaction");
+                       Object newPartitionsInTransactionIsEmpty = 
invoke(newPartitionsInTransaction, "isEmpty");
+                       TransactionalRequestResult result;
+                       if (newPartitionsInTransactionIsEmpty instanceof 
Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+                               Object txnRequestHandler = 
invoke(transactionManager, "addPartitionsToTransactionHandler");
+                               invoke(transactionManager, "enqueueRequest", 
new Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+                               result = (TransactionalRequestResult) 
getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), 
"result");
+                       } else {
+                               result = new TransactionalRequestResult();
+                               result.done();
+                       }
                        return result;
                }
        }
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 6e13cc6a..b6de3c7 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -22,12 +22,14 @@ import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
        protected String transactionalId;
        protected Properties extraProperties;
 
+       @BeforeClass
+       public static void prepare() throws Exception {
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting KafkaTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               Properties serverProperties = new Properties();
+               serverProperties.put("transaction.state.log.num.partitions", 
Integer.toString(1));
+               serverProperties.put("auto.leader.rebalance.enable", 
Boolean.toString(false));
+               startClusters(KafkaTestEnvironment.createConfig()
+                       .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+                       .setSecureMode(false)
+                       .setHideKafkaBehindProxy(true)
+                       .setKafkaServerProperties(serverProperties));
+       }
+
        @Before
        public void before() {
                transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                kafkaProducer.flush();
        }
 
+       @Test(timeout = 30000L)
+       public void 
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws 
Exception {
+               String topic = "flink-kafka-producer-txn-coordinator-changed";
+               createTestTopic(topic, 1, 2);
+               try (Producer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+                       kafkaProducer.flush();
+                       kafkaProducer.commitTransaction();
+               }
+               deleteTestTopic(topic);
+       }
+
        private FlinkKafkaProducer<String, String> getClosedProducer(String 
topicName) {
                FlinkKafkaProducer<String, String> kafkaProducer = new 
FlinkKafkaProducer<>(extraProperties);
                kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                        assertEquals(expectedValue, record.value());
                }
        }
+
+       private void restartBroker(int brokerId) {
+               KafkaServer toRestart = null;
+               for (KafkaServer server : kafkaServer.getBrokers()) {
+                       if (kafkaServer.getBrokerId(server) == brokerId) {
+                               toRestart = server;
+                       }
+               }
+
+               if (toRestart == null) {
+                       StringBuilder listOfBrokers = new StringBuilder();
+                       for (KafkaServer server : kafkaServer.getBrokers()) {
+                               
listOfBrokers.append(kafkaServer.getBrokerId(server));
+                               listOfBrokers.append(" ; ");
+                       }
+
+                       throw new IllegalArgumentException("Cannot find broker 
to restart: " + brokerId
+                               + " ; available brokers: " + 
listOfBrokers.toString());
+               } else {
+                       toRestart.shutdown();
+                       toRestart.awaitShutdown();
+                       toRestart.startup();
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 78bbb53..8c2246e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -269,9 +269,17 @@ public class FlinkKafkaInternalProducer<K, V> implements 
Producer<K, V> {
        private TransactionalRequestResult enqueueNewPartitions() {
                Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
                synchronized (transactionManager) {
-                       Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-                       invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-                       TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+                       Object newPartitionsInTransaction = 
getValue(transactionManager, "newPartitionsInTransaction");
+                       Object newPartitionsInTransactionIsEmpty = 
invoke(newPartitionsInTransaction, "isEmpty");
+                       TransactionalRequestResult result;
+                       if (newPartitionsInTransactionIsEmpty instanceof 
Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+                               Object txnRequestHandler = 
invoke(transactionManager, "addPartitionsToTransactionHandler");
+                               invoke(transactionManager, "enqueueRequest", 
new Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+                               result = (TransactionalRequestResult) 
getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), 
"result");
+                       } else {
+                               result = new TransactionalRequestResult();
+                               result.done();
+                       }
                        return result;
                }
        }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 2d749ba..5062b70 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -22,12 +22,14 @@ import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalPr
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaInternalProducerITCase extends 
KafkaTestBase {
        protected String transactionalId;
        protected Properties extraProperties;
 
+       @BeforeClass
+       public static void prepare() throws Exception {
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting KafkaTestBase ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               Properties serverProperties = new Properties();
+               serverProperties.put("transaction.state.log.num.partitions", 
Integer.toString(1));
+               serverProperties.put("auto.leader.rebalance.enable", 
Boolean.toString(false));
+               startClusters(KafkaTestEnvironment.createConfig()
+                       .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+                       .setSecureMode(false)
+                       .setHideKafkaBehindProxy(true)
+                       .setKafkaServerProperties(serverProperties));
+       }
+
        @Before
        public void before() {
                transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaInternalProducerITCase extends 
KafkaTestBase {
                kafkaProducer.flush();
        }
 
+       @Test(timeout = 30000L)
+       public void 
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws 
Exception {
+               String topic = "flink-kafka-producer-txn-coordinator-changed";
+               createTestTopic(topic, 1, 2);
+               try (Producer<String, String> kafkaProducer = new 
FlinkKafkaInternalProducer<>(extraProperties)) {
+                       kafkaProducer.initTransactions();
+                       kafkaProducer.beginTransaction();
+                       
restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+                       kafkaProducer.flush();
+                       kafkaProducer.commitTransaction();
+               }
+               deleteTestTopic(topic);
+       }
+
        private FlinkKafkaInternalProducer<String, String> 
getClosedProducer(String topicName) {
                FlinkKafkaInternalProducer<String, String> kafkaProducer = new 
FlinkKafkaInternalProducer<>(extraProperties);
                kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaInternalProducerITCase extends 
KafkaTestBase {
                        assertEquals(expectedValue, record.value());
                }
        }
+
+       private void restartBroker(int brokerId) {
+               KafkaServer toRestart = null;
+               for (KafkaServer server : kafkaServer.getBrokers()) {
+                       if (kafkaServer.getBrokerId(server) == brokerId) {
+                               toRestart = server;
+                       }
+               }
+
+               if (toRestart == null) {
+                       StringBuilder listOfBrokers = new StringBuilder();
+                       for (KafkaServer server : kafkaServer.getBrokers()) {
+                               
listOfBrokers.append(kafkaServer.getBrokerId(server));
+                               listOfBrokers.append(" ; ");
+                       }
+
+                       throw new IllegalArgumentException("Cannot find broker 
to restart: " + brokerId
+                               + " ; available brokers: " + 
listOfBrokers.toString());
+               } else {
+                       toRestart.shutdown();
+                       toRestart.awaitShutdown();
+                       toRestart.startup();
+               }
+       }
 }

Reply via email to