[ https://issues.apache.org/jira/browse/KAFKA-6768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431441#comment-16431441 ]
ASF GitHub Bot commented on KAFKA-6768: --------------------------------------- hachikuji closed pull request #4842: KAFKA-6768; Transactional producer may hang in close with pending requests URL: https://github.com/apache/kafka/pull/4842 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 426b273b885..0514c995635 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ private boolean maybeSendTransactionalRequest(long now) { return false; AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder(); - while (running) { + while (!forceClose) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6fcf4805967..558ec721096 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -132,6 +132,26 @@ public void setup() { client.setNode(brokerNode); } + @Test + public void testSenderShutdownWithPendingAddPartitions() throws Exception { + long pid = 13131L; + short epoch = 1; + doInitTransactions(pid, epoch); + transactionManager.beginTransaction(); + + transactionManager.maybeAddPartitionToTransaction(tp0); + FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + prepareAddPartitionsToTxn(tp0, Errors.NONE); + prepareProduceResponse(Errors.NONE, pid, epoch); + + sender.initiateClose(); + sender.run(); + + assertTrue(sendFuture.isDone()); + } + @Test public void testEndTxnNotSentIfIncompleteBatches() { long pid = 13131L; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Producer may hang in close with pending transaction > --------------------------------------------------- > > Key: KAFKA-6768 > URL: https://issues.apache.org/jira/browse/KAFKA-6768 > Project: Kafka > Issue Type: Bug > Components: producer > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > Fix For: 1.1.1 > > > There is an edge case for transactional producers which will cause close() to > hang indefinitely (unless used with a timeout). Say, for example, that the > producer is trying to send an AddPartitionsToTxn request to the broker. Upon > shutdown, the Sender's running flag will be set to false and we will begin > graceful shutdown. Graceful shutdown will not complete, however, until we can > send the AddPartitionsToTxn request. But the latter is blocked by the fact > that the running flag is disabled. So no progress can be made and shutdown > cannot complete. -- This message was sent by Atlassian JIRA (v7.6.3#76005)