[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415868#comment-16415868
 ] 

ASF GitHub Bot commented on KAFKA-6446:
---------------------------------------

hachikuji closed pull request #4563: KAFKA-6446: KafkaProducer should use timed 
version of `await` to avoid endless waiting
URL: https://github.com/apache/kafka/pull/4563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5fc9a1b9b38..a5af5b60093 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -256,6 +256,7 @@
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
     private final TransactionManager transactionManager;
+    private TransactionalRequestResult initTransactionsResult;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as 
configuration. Valid configuration strings
@@ -555,18 +556,36 @@ private static int parseAcks(String acksString) {
      *   2. Gets the internal producer id and epoch, used in all future 
transactional
      *      messages issued by the producer.
      *
+     * Note that this method will raise {@link TimeoutException} if the 
transactional state cannot
+     * be initialized before expiration of {@code max.block.ms}. Additionally, 
it will raise {@link InterruptException}
+     * if interrupted. It is safe to retry in either case, but once the 
transactional state has been successfully
+     * initialized, this method should no longer be used.
+     *
      * @throws IllegalStateException if no transactional.id has been configured
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
      *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
      *         transactional.id is not authorized. See the exception for more 
details
      * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed <code>max.block.ms</code>.
+     * @throws InterruptException if the thread is interrupted while blocked
      */
     public void initTransactions() {
         throwIfNoTransactionManager();
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
-        sender.wakeup();
-        result.await();
+        if (initTransactionsResult == null) {
+            initTransactionsResult = 
transactionManager.initializeTransactions();
+            sender.wakeup();
+        }
+
+        try {
+            if (initTransactionsResult.await(maxBlockTimeMs, 
TimeUnit.MILLISECONDS)) {
+                initTransactionsResult = null;
+            } else {
+                throw new TimeoutException("Timeout expired while initializing 
transactional state in " + maxBlockTimeMs + "ms.");
+            }
+        } catch (InterruptedException e) {
+            throw new InterruptException("Initialize transactions 
interrupted.", e);
+        }
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7eea4992b33..426b273b885 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) {
             return false;
 
         AbstractRequest.Builder<?> requestBuilder = 
nextRequestHandler.requestBuilder();
-        while (true) {
+        while (running) {
             Node targetNode = null;
             try {
                 if (nextRequestHandler.needsCoordinator()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index ff93da872dc..9c02e94c045 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -59,7 +59,10 @@ public void await() {
     }
 
     public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
-        return latch.await(timeout, unit);
+        boolean success = latch.await(timeout, unit);
+        if (!isSuccessful())
+            throw error();
+        return success;
     }
 
     public RuntimeException error() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 9f70fd7e708..8bfc5e7d28a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -548,4 +548,65 @@ public void testPartitionsForWithNullTopic() {
             // expected
         }
     }
+
+    @Test(expected = TimeoutException.class)
+    public void testInitTransactionTimeout() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, 
client);
+        try {
+            producer.initTransactions();
+            fail("initTransactions() should have raised TimeoutException");
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test(expected = KafkaException.class)
+    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(
+                new ProducerConfig(ProducerConfig.addSerializerToConfig(props, 
new StringSerializer(), new StringSerializer())),
+                new StringSerializer(), new StringSerializer(), metadata, 
client);
+        try {
+            producer.initTransactions();
+        } catch (TimeoutException e) {
+            // expected
+        }
+        // other transactional operations should not be allowed if we catch 
the error after initTransactions failed
+        try {
+            producer.beginTransaction();
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
 }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 911808a49cd..8435e5a3a6c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.lang.{Long => JLong}
 import java.util.Properties
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
@@ -27,7 +27,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.ProducerFencedException
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
@@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
   }
 
+  @Test(expected = classOf[KafkaException])
+  def testConsecutivelyRunInitTransactions(): Unit = {
+    val producer = createTransactionalProducer(transactionalId = 
"normalProducer")
+
+    try {
+      producer.initTransactions()
+      producer.initTransactions()
+      fail("Should have raised a KafkaException")
+    } finally {
+      producer.close()
+    }
+  }
+
   private def sendTransactionalMessagesWithValueRange(producer: 
KafkaProducer[Array[Byte], Array[Byte]], topic: String,
                                                       start: Int, end: Int, 
willBeCommitted: Boolean): Unit = {
     for (i <- start until end) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaProducer with transactionId endless waits when bootstrap server is down
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-6446
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6446
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 0.11.0.0, 1.0.0
>            Reporter: Eduardo Sciullo
>            Assignee: huxihx
>            Priority: Critical
>             Fix For: 1.2.0
>
>         Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to