This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 41406bc  KAFKA-9749; Transaction coordinator should treat 
KAFKA_STORAGE_ERROR as retriable (#8336)
41406bc is described below

commit 41406bce555c5d31cba909131b488c487435cc63
Author: Bob Barrett <bob.barr...@confluent.io>
AuthorDate: Tue Mar 24 14:27:53 2020 -0700

    KAFKA-9749; Transaction coordinator should treat KAFKA_STORAGE_ERROR as 
retriable (#8336)
    
    When handling a WriteTxnResponse, the 
TransactionMarkerRequestCompletionHandler throws an IllegalStateException when 
the remote broker responds with a KAFKA_STORAGE_ERROR and does not retry the 
request. This leaves the transaction state stuck in PendingAbort or 
PendingCommit, with no way to change that state other than restarting the 
broker, because both EndTxnRequest and InitProducerIdRequest return 
CONCURRENT_TRANSACTIONS if the state is PendingAbort or PendingCommit. This 
patch cha [...]
    
    Reviewers: Boyang Chen <boy...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>
---
 .../transaction/TransactionMarkerRequestCompletionHandler.scala      | 3 ++-
 .../transaction/TransactionMarkerRequestCompletionHandlerTest.scala  | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index fefe767..f655770 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -146,7 +146,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
                          Errors.NOT_LEADER_FOR_PARTITION |
                          Errors.NOT_ENOUGH_REPLICAS |
                          Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
-                         Errors.REQUEST_TIMED_OUT => // these are retriable 
errors
+                         Errors.REQUEST_TIMED_OUT |
+                         Errors.KAFKA_STORAGE_ERROR => // these are retriable 
errors
 
                       info(s"Sending $transactionalId's transaction marker for 
partition $topicPartition has failed with error ${error.exceptionName}, 
retrying " +
                         s"with current coordinator epoch 
${epochAndMetadata.coordinatorEpoch}")
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 84f3dff..1db652a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -190,6 +190,11 @@ class TransactionMarkerRequestCompletionHandlerTest {
   }
 
   @Test
+  def shouldRetryPartitionWhenKafkaStorageError(): Unit = {
+    verifyRetriesPartitionOnError(Errors.KAFKA_STORAGE_ERROR)
+  }
+
+  @Test
   def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): 
Unit = {
     mockCache()
     
verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)

Reply via email to