This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 41406bc KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336) 41406bc is described below commit 41406bce555c5d31cba909131b488c487435cc63 Author: Bob Barrett <bob.barr...@confluent.io> AuthorDate: Tue Mar 24 14:27:53 2020 -0700 KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as retriable (#8336) When handling a WriteTxnResponse, the TransactionMarkerRequestCompletionHandler throws an IllegalStateException when the remote broker responds with a KAFKA_STORAGE_ERROR and does not retry the request. This leaves the transaction state stuck in PendingAbort or PendingCommit, with no way to change that state other than restarting the broker, because both EndTxnRequest and InitProducerIdRequest return CONCURRENT_TRANSACTIONS if the state is PendingAbort or PendingCommit. This patch cha [...] Reviewers: Boyang Chen <boy...@confluent.io>, Jason Gustafson <ja...@confluent.io> --- .../transaction/TransactionMarkerRequestCompletionHandler.scala | 3 ++- .../transaction/TransactionMarkerRequestCompletionHandlerTest.scala | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index fefe767..f655770 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -146,7 +146,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, Errors.NOT_LEADER_FOR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND | - Errors.REQUEST_TIMED_OUT => // these are retriable errors + Errors.REQUEST_TIMED_OUT | + Errors.KAFKA_STORAGE_ERROR => // these are retriable errors info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " + s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}") diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala index 84f3dff..1db652a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala @@ -190,6 +190,11 @@ class TransactionMarkerRequestCompletionHandlerTest { } @Test + def shouldRetryPartitionWhenKafkaStorageError(): Unit = { + verifyRetriesPartitionOnError(Errors.KAFKA_STORAGE_ERROR) + } + + @Test def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): Unit = { mockCache() verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)