This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 30f4fbf KAFKA-9307; Make transaction metadata loading resilient to
previous errors (#7840)
30f4fbf is described below
commit 30f4fbf51a76cd232767802bd6368ea82f31f437
Author: Dhruvil Shah <[email protected]>
AuthorDate: Mon Dec 23 15:20:40 2019 -0800
KAFKA-9307; Make transaction metadata loading resilient to previous errors
(#7840)
Allow transaction metadata to be reloaded, even if it already exists as of
a previous epoch. This helps with cases where a previous become-follower
transition failed to unload corresponding metadata.
Reviewers: Jun Rao <[email protected]>, Jason Gustafson <[email protected]>
---
.../transaction/TransactionCoordinator.scala | 10 +++-
.../transaction/TransactionStateManager.scala | 56 +++++++++++-----------
.../transaction/TransactionStateManagerTest.scala | 28 ++++++++++-
3 files changed, 63 insertions(+), 31 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 3f6ec34..d646757 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -275,12 +275,18 @@ class TransactionCoordinator(brokerId: Int,
}
def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int):
Unit = {
+ // The operations performed during immigration must be resilient to any
previous errors we saw or partial state we
+ // left off during the unloading phase. Ensure we remove all associated
state for this partition before we continue
+ // loading it.
+
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+
+ // Now load the partition.
txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId,
coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
}
def handleTxnEmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int):
Unit = {
- txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId,
coordinatorEpoch)
-
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
+ txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId,
coordinatorEpoch)
+
txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
}
private def logInvalidStateTransitionAndReturnError(transactionalId: String,
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 01a24d6..9a9ed73 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -86,13 +86,13 @@ class TransactionStateManager(brokerId: Int,
private val stateLock = new ReentrantReadWriteLock()
/** partitions of transaction topic that are being loaded, state lock should
be called BEFORE accessing this set */
- private val loadingPartitions:
mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+ private[transaction] val loadingPartitions:
mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** partitions of transaction topic that are being removed, state lock
should be called BEFORE accessing this set */
- private val leavingPartitions:
mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
+ private[transaction] val leavingPartitions:
mutable.Set[TransactionPartitionAndLeaderEpoch] = mutable.Set()
/** transaction metadata cache indexed by assigned transaction topic
partition ids */
- private val transactionMetadataCache: mutable.Map[Int,
TxnMetadataCacheEntry] = mutable.Map()
+ private[transaction] val transactionMetadataCache: mutable.Map[Int,
TxnMetadataCacheEntry] = mutable.Map()
/** number of partitions for the transaction log topic */
private val transactionTopicPartitionCount =
getTransactionTopicPartitionCount
@@ -367,31 +367,25 @@ class TransactionStateManager(brokerId: Int,
/**
* Add a transaction topic partition into the cache
- *
- * Make it package-private to be used only for unit tests.
*/
- private[transaction] def addLoadedTransactionsToCache(txnTopicPartition:
Int, coordinatorEpoch: Int, metadataPerTransactionalId: Pool[String,
TransactionMetadata]): Unit = {
- val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch,
metadataPerTransactionalId)
- val currentTxnMetadataCacheEntry =
transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
-
- if (currentTxnMetadataCacheEntry.isDefined) {
- val coordinatorEpoch = currentTxnMetadataCacheEntry.get.coordinatorEpoch
- val metadataPerTxnId =
currentTxnMetadataCacheEntry.get.metadataPerTransactionalId
- val errorMsg = s"The metadata cache for txn partition $txnTopicPartition
has already exist with epoch $coordinatorEpoch " +
- s"and ${metadataPerTxnId.size} entries while trying to add to it; " +
- s"this should not happen"
- fatal(errorMsg)
- throw new IllegalStateException(errorMsg)
+ private[transaction] def addLoadedTransactionsToCache(txnTopicPartition: Int,
+ coordinatorEpoch: Int,
+ loadedTransactions:
Pool[String, TransactionMetadata]): Unit = {
+ val txnMetadataCacheEntry = TxnMetadataCacheEntry(coordinatorEpoch,
loadedTransactions)
+ val previousTxnMetadataCacheEntryOpt =
transactionMetadataCache.put(txnTopicPartition, txnMetadataCacheEntry)
+
+ previousTxnMetadataCacheEntryOpt.foreach { previousTxnMetadataCacheEntry =>
+ warn(s"Unloaded transaction metadata $previousTxnMetadataCacheEntry from
$txnTopicPartition as part of " +
+ s"loading metadata at epoch $coordinatorEpoch")
}
}
/**
- * When this broker becomes a leader for a transaction log partition, load
this partition and
- * populate the transaction metadata cache with the transactional ids.
+ * When this broker becomes a leader for a transaction log partition, load
this partition and populate the transaction
+ * metadata cache with the transactional ids. This operation must be
resilient to any partial state left off from
+ * the previous loading / unloading operation.
*/
def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch:
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
- validateTransactionTopicPartitionCountIsStable()
-
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -401,7 +395,9 @@ class TransactionStateManager(brokerId: Int,
}
def loadTransactions(): Unit = {
- info(s"Loading transaction metadata from $topicPartition")
+ info(s"Loading transaction metadata from $topicPartition at epoch
$coordinatorEpoch")
+ validateTransactionTopicPartitionCountIsStable()
+
val loadedTransactions = loadTransactionMetadata(topicPartition,
coordinatorEpoch)
inWriteLock(stateLock) {
@@ -436,6 +432,8 @@ class TransactionStateManager(brokerId: Int,
}
}
}
+
+ info(s"Completed loading transaction metadata from $topicPartition for
coordinator epoch $coordinatorEpoch")
}
scheduler.schedule(s"load-txns-for-partition-$topicPartition", () =>
loadTransactions)
@@ -446,8 +444,6 @@ class TransactionStateManager(brokerId: Int,
* that belong to that partition.
*/
def removeTransactionsForTxnTopicPartition(partitionId: Int,
coordinatorEpoch: Int): Unit = {
- validateTransactionTopicPartitionCountIsStable()
-
val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
val partitionAndLeaderEpoch =
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -461,11 +457,10 @@ class TransactionStateManager(brokerId: Int,
if (leavingPartitions.contains(partitionAndLeaderEpoch)) {
transactionMetadataCache.remove(partitionId) match {
case Some(txnMetadataCacheEntry) =>
- info(s"Removed
${txnMetadataCacheEntry.metadataPerTransactionalId.size} cached transaction
metadata for $topicPartition on follower transition")
+ info(s"Unloaded transaction metadata $txnMetadataCacheEntry for
$topicPartition on become-follower transition")
case None =>
- info(s"Trying to remove cached transaction metadata for
$topicPartition on follower transition but there is no entries remaining; " +
- s"it is likely that another process for removing the cached
entries has just executed earlier before")
+ info(s"No cached transaction metadata found for $topicPartition
during become-follower transition")
}
leavingPartitions.remove(partitionAndLeaderEpoch)
@@ -658,7 +653,12 @@ class TransactionStateManager(brokerId: Int,
}
-private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
metadataPerTransactionalId: Pool[String, TransactionMetadata])
+private[transaction] case class TxnMetadataCacheEntry(coordinatorEpoch: Int,
+
metadataPerTransactionalId: Pool[String, TransactionMetadata]) {
+ override def toString: String = {
+ s"TxnMetadataCacheEntry(coordinatorEpoch=$coordinatorEpoch,
numTransactionalEntries=${metadataPerTransactionalId.size})"
+ }
+}
private[transaction] case class
CoordinatorEpochAndTxnMetadata(coordinatorEpoch: Int, transactionMetadata:
TransactionMetadata)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index d152888..038d4c5 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -467,7 +467,33 @@ class TransactionStateManagerTest {
verifyMetadataDoesExistAndIsUsable(transactionalId2)
}
- private def verifyMetadataDoesExistAndIsUsable(transactionalId: String) = {
+ @Test
+ def testSuccessfulReimmigration(): Unit = {
+ txnMetadata1.state = PrepareCommit
+ txnMetadata1.addPartitions(Set[TopicPartition](new
TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)))
+
+ txnRecords += new SimpleRecord(txnMessageKeyBytes1,
TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
+ val startOffset = 0L
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
txnRecords.toArray: _*)
+
+ prepareTxnLog(topicPartition, 0, records)
+
+ // immigrate partition at epoch 0
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch = 0, (_, _, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(0, transactionManager.leavingPartitions.size)
+
+ // Re-immigrate partition at epoch 1. This should be successful even
though we didn't get to emigrate the partition.
+ prepareTxnLog(topicPartition, 0, records)
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId,
coordinatorEpoch = 1, (_, _, _, _, _) => ())
+ assertEquals(0, transactionManager.loadingPartitions.size)
+ assertEquals(0, transactionManager.leavingPartitions.size)
+
assertTrue(transactionManager.transactionMetadataCache.get(partitionId).isDefined)
+ assertEquals(1,
transactionManager.transactionMetadataCache.get(partitionId).get.coordinatorEpoch)
+ }
+
+ private def verifyMetadataDoesExistAndIsUsable(transactionalId: String):
Unit = {
transactionManager.getTransactionState(transactionalId) match {
case Left(errors) => fail("shouldn't have been any errors")
case Right(None) => fail("metadata should have been removed")