Repository: kafka Updated Branches: refs/heads/0.11.0 3c3edc9db -> 53589a609
KAFKA-5351: Reset pending state when returning an error in appendTransactionToLog Without this patch, future client retries would get the `CONCURRENT_TRANSACTIONS` error code indefinitely, since the pending state wouldn't be cleared when the append to the log failed. Author: Apurva Mehta <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]> Closes #3184 from apurvam/KAFKA-5351-clear-pending-state-on-retriable-error (cherry picked from commit 049abe7efa17c9660fce7b57b4c235e24c72315c) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53589a60 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53589a60 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53589a60 Branch: refs/heads/0.11.0 Commit: 53589a6092daeddb2f7de0a6038ce633ecd0233b Parents: 3c3edc9 Author: Apurva Mehta <[email protected]> Authored: Wed May 31 22:48:43 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 31 22:48:50 2017 -0700 ---------------------------------------------------------------------- .../transaction/TransactionCoordinator.scala | 11 +++++++++ .../transaction/TransactionMetadata.scala | 7 ++++++ .../transaction/TransactionStateManager.scala | 25 +++++++++++++++++--- .../TransactionStateManagerTest.scala | 10 +++++++- 4 files changed, 49 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index f182420..44e32b1 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -198,6 +198,9 @@ class TransactionCoordinator(brokerId: Int, case Ongoing => // indicate to abort the current ongoing txn first Right(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch()) + case Dead => + throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state.") } } } @@ -326,6 +329,10 @@ class TransactionCoordinator(brokerId: Int, logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Empty => logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) + case Dead => + throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state.") + } } } @@ -364,6 +371,10 @@ class TransactionCoordinator(brokerId: Int, logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) + case Dead => + throw new IllegalStateException(s"Found transactionalId $transactionalId with state ${txnMetadata.state}. " + + s"This is illegal as we should never have transitioned to this state.") + } } } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index 5956f1d..dbf0ec5 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -307,6 +307,13 @@ private[transaction] class TransactionMetadata(val transactionalId: String, txnStartTimestamp = transitMetadata.txnStartTimestamp topicPartitions.clear() } + case Dead => + // The transactionalId was being expired. The completion of the operation should result in removal of the + // the metadata from the cache, so we should never realistically transition to the dead state. + throw new IllegalStateException(s"TransactionalId : $transactionalId is trying to complete a transition to " + + s"$toState. This means that the transactionalId was being expired, and the only acceptable completion of " + + s"this operation is to remove the transaction metadata from the cache, not to persist the $toState in the log.") + } debug(s"TransactionalId $transactionalId complete transition from $state to $transitMetadata") http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 19b9b91..05edefb 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -520,11 +520,9 @@ class TransactionStateManager(brokerId: Int, // in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " + s"has been appended to the log. The cached coordinator epoch has changed to ${epochAndMetadata.coordinatorEpoch}") - responseError = Errors.NOT_COORDINATOR } else { metadata.completeTransitionTo(newMetadata) - debug(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded") } } @@ -534,9 +532,30 @@ class TransactionStateManager(brokerId: Int, // return NOT_COORDINATOR to let the client re-discover the transaction coordinator info(s"Updating $transactionalId's transaction state (txn topic partition ${partitionFor(transactionalId)}) to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId " + s"failed after the transaction message has been appended to the log since the corresponding metadata does not exist in the cache anymore") - responseError = Errors.NOT_COORDINATOR } + } else { + // Reset the pending state when returning an error, since there is no active transaction for the transactional id at this point. + getAndMaybeAddTransactionState(transactionalId) match { + case Right(Some(epochAndTxnMetadata)) => + val metadata = epochAndTxnMetadata.transactionMetadata + metadata synchronized { + if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) { + debug(s"TransactionalId ${metadata.transactionalId}, resetting pending state since we are returning error $responseError") + metadata.pendingState = None + } else { + info(s"TransactionalId ${metadata.transactionalId} coordinator epoch changed from " + + s"${epochAndTxnMetadata.coordinatorEpoch} to $coordinatorEpoch after append to log returned $responseError") + } + } + case Right(None) => + // Do nothing here, since we want to return the original append error to the user. + info(s"Found no metadata TransactionalId $transactionalId after append to log returned error $responseError") + case Left(error) => + // Do nothing here, since we want to return the original append error to the user. + info(s"Retrieving metadata for transactionalId $transactionalId returned $error after append to the log returned error $responseError") + } + } responseCallback(responseError) http://git-wip-us.apache.org/repos/asf/kafka/blob/53589a60/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 479f99b..2094528 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -225,6 +225,7 @@ class TransactionStateManagerTest { transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch, newMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) // append to log again with expected failures txnMetadata1.pendingState = None @@ -236,18 +237,22 @@ class TransactionStateManagerTest { prepareForTxnMessageAppend(Errors.UNKNOWN_TOPIC_OR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.REQUEST_TIMED_OUT) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) // test NOT_COORDINATOR cases expectedError = Errors.NOT_COORDINATOR @@ -255,17 +260,20 @@ class TransactionStateManagerTest { prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) - // test NOT_COORDINATOR cases + // test Unknown cases expectedError = Errors.UNKNOWN prepareForTxnMessageAppend(Errors.MESSAGE_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) prepareForTxnMessageAppend(Errors.RECORD_LIST_TOO_LARGE) transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback) assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getAndMaybeAddTransactionState(transactionalId1)) + assertTrue(txnMetadata1.pendingState.isEmpty) } @Test
