This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 18eca0229dc KAFKA-18882 Remove BaseKey, TxnKey, and UnknownKey (#19054)
18eca0229dc is described below
commit 18eca0229dc9f33c3253c705170a25b7f78eac40
Author: Xuan-Zhang Gong <[email protected]>
AuthorDate: Wed Mar 5 21:16:18 2025 +0800
KAFKA-18882 Remove BaseKey, TxnKey, and UnknownKey (#19054)
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../coordinator/transaction/TransactionLog.scala | 33 ++++--------------
.../transaction/TransactionStateManager.scala | 13 +++----
.../transaction/TransactionLogTest.scala | 40 ++++++++++++----------
.../transaction/TransactionStateManagerTest.scala | 11 +++---
4 files changed, 41 insertions(+), 56 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 57a67ab8dd1..5972418d0c1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -90,19 +90,15 @@ object TransactionLog {
/**
* Decodes the transaction log messages' key
*
- * @return the key
+ * @return left with the version if the key is not a transaction log key,
right with the transactional id otherwise
*/
- def readTxnRecordKey(buffer: ByteBuffer): BaseKey = {
+ def readTxnRecordKey(buffer: ByteBuffer): Either[Short, String] = {
val version = buffer.getShort
- if (version == CoordinatorRecordType.TRANSACTION_LOG.id) {
- val value = new TransactionLogKey(new ByteBufferAccessor(buffer),
0.toShort)
- TxnKey(
- version = version,
- transactionalId = value.transactionalId
- )
- } else {
- UnknownKey(version)
- }
+ Either.cond(
+ version == CoordinatorRecordType.TRANSACTION_LOG.id,
+ new TransactionLogKey(new ByteBufferAccessor(buffer),
0.toShort).transactionalId,
+ version
+ )
}
/**
@@ -143,18 +139,3 @@ object TransactionLog {
}
}
}
-
-sealed trait BaseKey{
- def version: Short
- def transactionalId: String
-}
-
-case class TxnKey(version: Short, transactionalId: String) extends BaseKey {
- override def toString: String = transactionalId
-}
-
-case class UnknownKey(version: Short) extends BaseKey {
- override def transactionalId: String = null
- override def toString: String = transactionalId
-}
-
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a6e7dd30bf0..dbf04d98708 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -465,25 +465,22 @@ class TransactionStateManager(brokerId: Int,
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
}
-
memRecords.batches.forEach { batch =>
for (record <- batch.asScala) {
require(record.hasKey, "Transaction state log's key should not
be null")
TransactionLog.readTxnRecordKey(record.key) match {
- case txnKey: TxnKey =>
+ case Left(version) =>
+ warn(s"Unknown message key with version $version" +
+ s" while loading transaction state from $topicPartition.
Ignoring it. " +
+ "It could be a left over from an aborted upgrade.")
+ case Right(transactionalId) =>
// load transaction metadata along with transaction state
- val transactionalId = txnKey.transactionalId
TransactionLog.readTxnRecordValue(transactionalId,
record.value) match {
case None =>
loadedTransactions.remove(transactionalId)
case Some(txnMetadata) =>
loadedTransactions.put(transactionalId, txnMetadata)
}
-
- case unknownKey: UnknownKey =>
- warn(s"Unknown message key with version
${unknownKey.version}" +
- s" while loading transaction state from $topicPartition.
Ignoring it. " +
- "It could be a left over from an aborted upgrade.")
}
}
currOffset = batch.nextOffset
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
index c976ffc0210..8a852d70cbe 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
@@ -25,7 +25,7 @@ import
org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, St
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch,
SimpleRecord}
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey,
TransactionLogValue}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue, fail}
import org.junit.jupiter.api.Test
import java.nio.ByteBuffer
@@ -89,21 +89,23 @@ class TransactionLogTest {
var count = 0
for (record <- records.records.asScala) {
- val txnKey = TransactionLog.readTxnRecordKey(record.key)
- val transactionalId = txnKey.transactionalId
- val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId,
record.value).get
-
- assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
- assertEquals(producerEpoch, txnMetadata.producerEpoch)
- assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
- assertEquals(transactionStates(txnMetadata.producerId),
txnMetadata.state)
-
- if (txnMetadata.state.equals(Empty))
- assertEquals(Set.empty[TopicPartition], txnMetadata.topicPartitions)
- else
- assertEquals(topicPartitions, txnMetadata.topicPartitions)
-
- count = count + 1
+ TransactionLog.readTxnRecordKey(record.key) match {
+ case Left(version) => fail(s"Unexpected record version: $version")
+ case Right(transactionalId) =>
+ val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId,
record.value).get
+
+ assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
+ assertEquals(producerEpoch, txnMetadata.producerEpoch)
+ assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
+ assertEquals(transactionStates(txnMetadata.producerId),
txnMetadata.state)
+
+ if (txnMetadata.state.equals(Empty))
+ assertEquals(Set.empty[TopicPartition],
txnMetadata.topicPartitions)
+ else
+ assertEquals(topicPartitions, txnMetadata.topicPartitions)
+
+ count = count + 1
+ }
}
assertEquals(pidMappings.size, count)
@@ -235,7 +237,9 @@ class TransactionLogTest {
def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = {
val record = new TransactionLogKey()
val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue,
record)
- val key = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord))
- assertEquals(UnknownKey(Short.MaxValue), key)
+ TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord)) match {
+ case Left(version) => assertEquals(Short.MaxValue, version)
+ case Right(_) => fail("Expected to read unknown message")
+ }
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 522461e5485..5d8f5dc5648 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -889,10 +889,13 @@ class TransactionStateManagerTest {
appendedRecords.values.foreach { batches =>
batches.foreach { records =>
records.records.forEach { record =>
- val transactionalId =
TransactionLog.readTxnRecordKey(record.key).transactionalId
- assertNull(record.value)
- expiredTransactionalIds += transactionalId
- assertEquals(Right(None),
transactionManager.getTransactionState(transactionalId))
+ TransactionLog.readTxnRecordKey(record.key) match {
+ case Right(transactionalId) =>
+ assertNull(record.value)
+ expiredTransactionalIds += transactionalId
+ assertEquals(Right(None),
transactionManager.getTransactionState(transactionalId))
+ case Left(value) => fail(s"Failed to read transactional id from
tombstone: $value")
+ }
}
}
}