Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3c3edc9db -> 53589a609


KAFKA-5351: Reset pending state when returning an error in 
appendTransactionToLog

Without this patch, future client retries would get the 
`CONCURRENT_TRANSACTIONS` error code indefinitely, since the pending state 
wouldn't be cleared when the append to the log failed.

Author: Apurva Mehta <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang 
<[email protected]>

Closes #3184 from apurvam/KAFKA-5351-clear-pending-state-on-retriable-error

(cherry picked from commit 049abe7efa17c9660fce7b57b4c235e24c72315c)
Signed-off-by: Guozhang Wang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53589a60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53589a60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53589a60

Branch: refs/heads/0.11.0
Commit: 53589a6092daeddb2f7de0a6038ce633ecd0233b
Parents: 3c3edc9
Author: Apurva Mehta <[email protected]>
Authored: Wed May 31 22:48:43 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed May 31 22:48:50 2017 -0700

----------------------------------------------------------------------
 .../transaction/TransactionCoordinator.scala    | 11 +++++++++
 .../transaction/TransactionMetadata.scala       |  7 ++++++
 .../transaction/TransactionStateManager.scala   | 25 +++++++++++++++++---
 .../TransactionStateManagerTest.scala           | 10 +++++++-
 4 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index f182420..44e32b1 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -198,6 +198,9 @@ class TransactionCoordinator(brokerId: Int,
         case Ongoing =>
           // indicate to abort the current ongoing txn first
           Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())
+        case Dead =>
+          throw new IllegalStateException(s"Found transactionalId 
$transactionalId with state ${txnMetadata.state}. " +
+            s"This is illegal as we should never have transitioned to this 
state.")
       }
     }
   }
@@ -326,6 +329,10 @@ class TransactionCoordinator(brokerId: Int,
                   logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
               case Empty =>
                 logInvalidStateTransitionAndReturnError(transactionalId, 
txnMetadata.state, txnMarkerResult)
+              case Dead =>
+                throw new IllegalStateException(s"Found transactionalId 
$transactionalId with state ${txnMetadata.state}. " +
+                  s"This is illegal as we should never have transitioned to 
this state.")
+
             }
           }
       }
@@ -364,6 +371,10 @@ class TransactionCoordinator(brokerId: Int,
                             
logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, 
txnMarkerResult)
                           else
                             Right(txnMetadata, 
txnMetadata.prepareComplete(time.milliseconds()))
+                        case Dead =>
+                          throw new IllegalStateException(s"Found 
transactionalId $transactionalId with state ${txnMetadata.state}. " +
+                            s"This is illegal as we should never have 
transitioned to this state.")
+
                       }
                     }
                   } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 5956f1d..dbf0ec5 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -307,6 +307,13 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,
             txnStartTimestamp = transitMetadata.txnStartTimestamp
             topicPartitions.clear()
           }
+        case Dead =>
+          // The transactionalId was being expired. The completion of the 
operation should result in removal of the
+          // the metadata from the cache, so we should never realistically 
transition to the dead state.
+          throw new IllegalStateException(s"TransactionalId : $transactionalId 
is trying to complete a transition to " +
+            s"$toState. This means that the transactionalId was being expired, 
and the only acceptable completion of " +
+            s"this operation is to remove the transaction metadata from the 
cache, not to persist the $toState in the log.")
+
       }
 
       debug(s"TransactionalId $transactionalId complete transition from $state 
to $transitMetadata")

http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 19b9b91..05edefb 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -520,11 +520,9 @@ class TransactionStateManager(brokerId: Int,
                 // in this case directly return NOT_COORDINATOR to client and 
let it to re-discover the transaction coordinator
                 info(s"Updating $transactionalId's transaction state to 
$newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
failed after the transaction message " +
                   s"has been appended to the log. The cached coordinator epoch 
has changed to ${epochAndMetadata.coordinatorEpoch}")
-
                 responseError = Errors.NOT_COORDINATOR
               } else {
                 metadata.completeTransitionTo(newMetadata)
-
                 debug(s"Updating $transactionalId's transaction state to 
$newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId 
succeeded")
               }
             }
@@ -534,9 +532,30 @@ class TransactionStateManager(brokerId: Int,
             // return NOT_COORDINATOR to let the client re-discover the 
transaction coordinator
             info(s"Updating $transactionalId's transaction state (txn topic 
partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator 
epoch $coordinatorEpoch for $transactionalId " +
               s"failed after the transaction message has been appended to the 
log since the corresponding metadata does not exist in the cache anymore")
-
             responseError = Errors.NOT_COORDINATOR
         }
+      } else {
+        // Reset the pending state when returning an error, since there is no 
active transaction for the transactional id at this point.
+        getAndMaybeAddTransactionState(transactionalId) match {
+          case Right(Some(epochAndTxnMetadata)) =>
+            val metadata = epochAndTxnMetadata.transactionMetadata
+            metadata synchronized {
+              if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
+                debug(s"TransactionalId ${metadata.transactionalId}, resetting 
pending state since we are returning error $responseError")
+                metadata.pendingState = None
+              } else {
+                info(s"TransactionalId ${metadata.transactionalId} coordinator 
epoch changed from " +
+                  s"${epochAndTxnMetadata.coordinatorEpoch} to 
$coordinatorEpoch after append to log returned $responseError")
+              }
+            }
+          case Right(None) =>
+            // Do nothing here, since we want to return the original append 
error to the user.
+            info(s"Found no metadata TransactionalId $transactionalId after 
append to log returned error $responseError")
+          case Left(error) =>
+            // Do nothing here, since we want to return the original append 
error to the user.
+            info(s"Retrieving metadata for transactionalId $transactionalId 
returned $error after append to the log returned error $responseError")
+        }
+
       }
 
       responseCallback(responseError)

http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 479f99b..2094528 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -225,6 +225,7 @@ class TransactionStateManagerTest {
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch, newMetadata, assertCallback)
 
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // append to log again with expected failures
     txnMetadata1.pendingState = None
@@ -236,18 +237,22 @@ class TransactionStateManagerTest {
     prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     // test NOT_COORDINATOR cases
     expectedError = Errors.NOT_COORDINATOR
@@ -255,17 +260,20 @@ class TransactionStateManagerTest {
     prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
-    // test NOT_COORDINATOR cases
+    // test Unknown cases
     expectedError = Errors.UNKNOWN
 
     prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
 
     prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE)
     transactionManager.appendTransactionToLog(transactionalId1, 
coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))), 
transactionManager.getAndMaybeAddTransactionState(transactionalId1))
+    assertTrue(txnMetadata1.pendingState.isEmpty)
   }
 
   @Test

Reply via email to