[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415868#comment-16415868 ]
ASF GitHub Bot commented on KAFKA-6446: --------------------------------------- hachikuji closed pull request #4563: KAFKA-6446: KafkaProducer should use timed version of `await` to avoid endless waiting URL: https://github.com/apache/kafka/pull/4563 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5fc9a1b9b38..a5af5b60093 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,6 +256,7 @@ private final ProducerInterceptors<K, V> interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -555,18 +556,36 @@ private static int parseAcks(String acksString) { * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * + * Note that this method will raise {@link TimeoutException} if the transactional state cannot + * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} + * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully + * initialized, this method should no longer be used. + * * @throws IllegalStateException if no transactional.id has been configured * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed <code>max.block.ms</code>. + * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { throwIfNoTransactionManager(); - TransactionalRequestResult result = transactionManager.initializeTransactions(); - sender.wakeup(); - result.await(); + if (initTransactionsResult == null) { + initTransactionsResult = transactionManager.initializeTransactions(); + sender.wakeup(); + } + + try { + if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + initTransactionsResult = null; + } else { + throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); + } + } catch (InterruptedException e) { + throw new InterruptException("Initialize transactions interrupted.", e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7eea4992b33..426b273b885 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder(); - while (true) { + while (running) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index ff93da872dc..9c02e94c045 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -59,7 +59,10 @@ public void await() { } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return latch.await(timeout, unit); + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) + throw error(); + return success; } public RuntimeException error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9f70fd7e708..8bfc5e7d28a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -548,4 +548,65 @@ public void testPartitionsForWithNullTopic() { // expected } } + + @Test(expected = TimeoutException.class) + public void testInitTransactionTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer<String, String> producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + fail("initTransactions() should have raised TimeoutException"); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } + + @Test(expected = KafkaException.class) + public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer<String, String> producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + } catch (TimeoutException e) { + // expected + } + // other transactional operations should not be allowed if we catch the error after initTransactions failed + try { + producer.beginTransaction(); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 911808a49cd..8435e5a3a6c 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -19,7 +19,7 @@ package kafka.api import java.lang.{Long => JLong} import java.util.Properties -import java.util.concurrent.{ExecutionException, TimeUnit} +import java.util.concurrent.TimeUnit import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig @@ -27,7 +27,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} @@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness { } } + @Test(expected = classOf[KafkaException]) + def testConsecutivelyRunInitTransactions(): Unit = { + val producer = createTransactionalProducer(transactionalId = "normalProducer") + + try { + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } finally { + producer.close() + } + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaProducer with transactionId endless waits when bootstrap server is down > ---------------------------------------------------------------------------- > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 0.11.0.0, 1.0.0 > Reporter: Eduardo Sciullo > Assignee: huxihx > Priority: Critical > Fix For: 1.2.0 > > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)