This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18eca0229dc KAFKA-18882 Remove BaseKey, TxnKey, and UnknownKey (#19054)
18eca0229dc is described below

commit 18eca0229dc9f33c3253c705170a25b7f78eac40
Author: Xuan-Zhang Gong <[email protected]>
AuthorDate: Wed Mar 5 21:16:18 2025 +0800

    KAFKA-18882 Remove BaseKey, TxnKey, and UnknownKey (#19054)
    
    Reviewers: Ken Huang <[email protected]>, TengYao Chi 
<[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../coordinator/transaction/TransactionLog.scala   | 33 ++++--------------
 .../transaction/TransactionStateManager.scala      | 13 +++----
 .../transaction/TransactionLogTest.scala           | 40 ++++++++++++----------
 .../transaction/TransactionStateManagerTest.scala  | 11 +++---
 4 files changed, 41 insertions(+), 56 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 57a67ab8dd1..5972418d0c1 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -90,19 +90,15 @@ object TransactionLog {
   /**
     * Decodes the transaction log messages' key
     *
-    * @return the key
+    * @return left with the version if the key is not a transaction log key, 
right with the transactional id otherwise
     */
-  def readTxnRecordKey(buffer: ByteBuffer): BaseKey = {
+  def readTxnRecordKey(buffer: ByteBuffer): Either[Short, String] = {
     val version = buffer.getShort
-    if (version == CoordinatorRecordType.TRANSACTION_LOG.id) {
-      val value = new TransactionLogKey(new ByteBufferAccessor(buffer), 
0.toShort)
-      TxnKey(
-        version = version,
-        transactionalId = value.transactionalId
-      )
-    } else {
-      UnknownKey(version)
-    }
+    Either.cond(
+      version == CoordinatorRecordType.TRANSACTION_LOG.id,
+      new TransactionLogKey(new ByteBufferAccessor(buffer), 
0.toShort).transactionalId,
+      version
+    )
   }
 
   /**
@@ -143,18 +139,3 @@ object TransactionLog {
     }
   }
 }
-
-sealed trait BaseKey{
-  def version: Short
-  def transactionalId: String
-}
-
-case class TxnKey(version: Short, transactionalId: String) extends BaseKey {
-  override def toString: String = transactionalId
-}
-
-case class UnknownKey(version: Short) extends BaseKey {
-  override def transactionalId: String = null
-  override def toString: String = transactionalId
-}
-
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a6e7dd30bf0..dbf04d98708 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -465,25 +465,22 @@ class TransactionStateManager(brokerId: Int,
                 fileRecords.readInto(buffer, 0)
                 MemoryRecords.readableRecords(buffer)
             }
-
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not 
be null")
                 TransactionLog.readTxnRecordKey(record.key) match {
-                  case txnKey: TxnKey =>
+                  case Left(version) =>
+                    warn(s"Unknown message key with version $version" +
+                      s" while loading transaction state from $topicPartition. 
Ignoring it. " +
+                      "It could be a left over from an aborted upgrade.")
+                  case Right(transactionalId) =>
                     // load transaction metadata along with transaction state
-                    val transactionalId = txnKey.transactionalId
                     TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
                       case None =>
                         loadedTransactions.remove(transactionalId)
                       case Some(txnMetadata) =>
                         loadedTransactions.put(transactionalId, txnMetadata)
                     }
-
-                  case unknownKey: UnknownKey =>
-                    warn(s"Unknown message key with version 
${unknownKey.version}" +
-                      s" while loading transaction state from $topicPartition. 
Ignoring it. " +
-                      "It could be a left over from an aborted upgrade.")
                 }
               }
               currOffset = batch.nextOffset
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
index c976ffc0210..8a852d70cbe 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
@@ -25,7 +25,7 @@ import 
org.apache.kafka.common.protocol.types.{CompactArrayOf, Field, Schema, St
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
 import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, 
TransactionLogValue}
 import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue, fail}
 import org.junit.jupiter.api.Test
 
 import java.nio.ByteBuffer
@@ -89,21 +89,23 @@ class TransactionLogTest {
 
     var count = 0
     for (record <- records.records.asScala) {
-      val txnKey = TransactionLog.readTxnRecordKey(record.key)
-      val transactionalId = txnKey.transactionalId
-      val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, 
record.value).get
-
-      assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
-      assertEquals(producerEpoch, txnMetadata.producerEpoch)
-      assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
-      assertEquals(transactionStates(txnMetadata.producerId), 
txnMetadata.state)
-
-      if (txnMetadata.state.equals(Empty))
-        assertEquals(Set.empty[TopicPartition], txnMetadata.topicPartitions)
-      else
-        assertEquals(topicPartitions, txnMetadata.topicPartitions)
-
-      count = count + 1
+      TransactionLog.readTxnRecordKey(record.key) match {
+        case Left(version) => fail(s"Unexpected record version: $version")
+        case Right(transactionalId) =>
+          val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, 
record.value).get
+
+          assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
+          assertEquals(producerEpoch, txnMetadata.producerEpoch)
+          assertEquals(transactionTimeoutMs, txnMetadata.txnTimeoutMs)
+          assertEquals(transactionStates(txnMetadata.producerId), 
txnMetadata.state)
+
+          if (txnMetadata.state.equals(Empty))
+            assertEquals(Set.empty[TopicPartition], 
txnMetadata.topicPartitions)
+          else
+            assertEquals(topicPartitions, txnMetadata.topicPartitions)
+
+          count = count + 1
+      }
     }
 
     assertEquals(pidMappings.size, count)
@@ -235,7 +237,9 @@ class TransactionLogTest {
   def testReadTxnRecordKeyCanReadUnknownMessage(): Unit = {
     val record = new TransactionLogKey()
     val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, 
record)
-    val key = TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord))
-    assertEquals(UnknownKey(Short.MaxValue), key)
+    TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord)) match {
+      case Left(version) => assertEquals(Short.MaxValue, version)
+      case Right(_) => fail("Expected to read unknown message")
+    }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 522461e5485..5d8f5dc5648 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -889,10 +889,13 @@ class TransactionStateManagerTest {
     appendedRecords.values.foreach { batches =>
       batches.foreach { records =>
         records.records.forEach { record =>
-          val transactionalId = 
TransactionLog.readTxnRecordKey(record.key).transactionalId
-          assertNull(record.value)
-          expiredTransactionalIds += transactionalId
-          assertEquals(Right(None), 
transactionManager.getTransactionState(transactionalId))
+          TransactionLog.readTxnRecordKey(record.key) match {
+            case Right(transactionalId) =>
+              assertNull(record.value)
+              expiredTransactionalIds += transactionalId
+              assertEquals(Right(None), 
transactionManager.getTransactionState(transactionalId))
+            case Left(value) => fail(s"Failed to read transactional id from 
tombstone: $value")
+          }
         }
       }
     }

Reply via email to