This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d81de035e0 KAFKA-18883 Move TransactionLog to transaction-coordinator 
module (#20456)
4d81de035e0 is described below

commit 4d81de035e07499eac48fe0bbc12af73efc885a6
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Jan 5 15:47:42 2026 +0800

    KAFKA-18883 Move TransactionLog to transaction-coordinator module (#20456)
    
    Ported kafka.coordinator.transaction.TransactionLog (Scala) to
    org.apache.kafka.coordinator.transaction.TransactionLog (Java).
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../coordinator/transaction/TransactionLog.scala   | 140 ------------------
 .../transaction/TransactionStateManager.scala      |  38 ++---
 .../TransactionCoordinatorConcurrencyTest.scala    |   2 +-
 .../transaction/TransactionStateManagerTest.scala  |  15 +-
 .../coordinator/transaction/TransactionLog.java    | 160 +++++++++++++++++++++
 .../transaction/TransactionLogTest.java            |  35 ++---
 6 files changed, 201 insertions(+), 189 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
deleted file mode 100644
index f024e88aa8e..00000000000
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.transaction
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
-import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata, 
TransactionState, TxnTransitMetadata}
-import 
org.apache.kafka.coordinator.transaction.generated.{CoordinatorRecordType, 
TransactionLogKey, TransactionLogValue}
-import org.apache.kafka.server.common.TransactionVersion
-
-import java.util
-
-import scala.jdk.CollectionConverters._
-
-/**
- * Messages stored for the transaction topic represent the producer id and 
transactional status of the corresponding
- * transactional id, which have versions for both the key and value fields. 
Key and value
- * versions are used to evolve the message formats:
- *
- * key version 0:               [transactionalId]
- *    -> value version 0:       [producer_id, producer_epoch, 
expire_timestamp, status, [topic, [partition] ], timestamp]
- */
-object TransactionLog {
-
-  // enforce always using
-  //  1. cleanup policy = compact
-  //  2. compression = none
-  //  3. unclean leader election = disabled
-  //  4. required acks = -1 when writing
-  val EnforcedCompression: Compression = Compression.NONE
-  val EnforcedRequiredAcks: Short = (-1).toShort
-
-  /**
-    * Generates the bytes for transaction log message key
-    *
-    * @return key bytes
-    */
-  def keyToBytes(transactionalId: String): Array[Byte] = {
-    MessageUtil.toCoordinatorTypePrefixedBytes(new 
TransactionLogKey().setTransactionalId(transactionalId))
-  }
-
-  /**
-    * Generates the payload bytes for transaction log message value
-    *
-    * @return value payload bytes
-    */
-  def valueToBytes(txnMetadata: TxnTransitMetadata,
-                                        transactionVersionLevel: 
TransactionVersion): Array[Byte] = {
-    if (txnMetadata.txnState == TransactionState.EMPTY && 
!txnMetadata.topicPartitions.isEmpty)
-        throw new IllegalStateException(s"Transaction is not expected to have 
any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
-
-      val transactionPartitions = if (txnMetadata.txnState == 
TransactionState.EMPTY) null
-      else txnMetadata.topicPartitions.asScala
-        .groupBy(_.topic)
-        .map { case (topic, partitions) =>
-          new TransactionLogValue.PartitionsSchema()
-            .setTopic(topic)
-            .setPartitionIds(partitions.map(tp => 
Integer.valueOf(tp.partition)).toList.asJava)
-        }.toList.asJava
-
-    // Serialize with version 0 (highest non-flexible version) until 
transaction.version 1 is enabled
-    // which enables flexible fields in records.
-    
MessageUtil.toVersionPrefixedBytes(transactionVersionLevel.transactionLogValueVersion(),
-      new TransactionLogValue()
-        .setProducerId(txnMetadata.producerId)
-        .setProducerEpoch(txnMetadata.producerEpoch)
-        .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs)
-        .setTransactionStatus(txnMetadata.txnState.id)
-        
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp)
-        .setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp)
-        .setTransactionPartitions(transactionPartitions)
-        
.setClientTransactionVersion(txnMetadata.clientTransactionVersion.featureLevel()))
-  }
-
-  /**
-    * Decodes the transaction log messages' key
-    *
-    * @return left with the version if the key is not a transaction log key, 
right with the transactional id otherwise
-    */
-  def readTxnRecordKey(buffer: ByteBuffer): Either[Short, String] = {
-    val version = buffer.getShort
-    Either.cond(
-      version == CoordinatorRecordType.TRANSACTION_LOG.id,
-      new TransactionLogKey(new ByteBufferAccessor(buffer), 
0.toShort).transactionalId,
-      version
-    )
-  }
-
-  /**
-    * Decodes the transaction log messages' payload and retrieves the 
transaction metadata from it
-    *
-    * @return a transaction metadata object from the message
-    */
-  def readTxnRecordValue(transactionalId: String, buffer: ByteBuffer): 
Option[TransactionMetadata] = {
-    // tombstone
-    if (buffer == null) None
-    else {
-      val version = buffer.getShort
-      if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version 
<= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
-        val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 
version)
-        val state = TransactionState.fromId(value.transactionStatus)
-        val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
-        if (!state.equals(TransactionState.EMPTY))
-          value.transactionPartitions.forEach(partitionsSchema => {
-            partitionsSchema.partitionIds.forEach(partitionId => tps.add(new 
TopicPartition(partitionsSchema.topic, partitionId.intValue())))
-          })
-        Some(new TransactionMetadata(
-          transactionalId,
-          value.producerId,
-          value.previousProducerId,
-          value.nextProducerId,
-          value.producerEpoch,
-          RecordBatch.NO_PRODUCER_EPOCH,
-          value.transactionTimeoutMs,
-          state,
-          tps,
-          value.transactionStartTimestampMs,
-          value.transactionLastUpdateTimestampMs,
-          TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
-      } else throw new IllegalStateException(s"Unknown version $version from 
the transaction log message value")
-    }
-  }
-}
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 82b960c5ba7..800ac310a13 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -35,7 +35,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicIdPartition, 
TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionMetadata, TransactionState, TransactionStateManagerConfig, 
TxnTransitMetadata}
+import org.apache.kafka.coordinator.transaction.{TransactionLog, 
TransactionLogConfig, TransactionMetadata, TransactionState, 
TransactionStateManagerConfig, TxnTransitMetadata}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
 import org.apache.kafka.server.config.ServerConfigs
@@ -183,7 +183,7 @@ class TransactionStateManager(brokerId: Int,
                 if (recordsBuilder == null) {
                   recordsBuilder = MemoryRecords.builder(
                     ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-                    TransactionLog.EnforcedCompression,
+                    TransactionLog.ENFORCED_COMPRESSION,
                     TimestampType.CREATE_TIME,
                     0L,
                     maxBatchSize
@@ -290,7 +290,7 @@ class TransactionStateManager(brokerId: Int,
     inReadLock(stateLock) {
       replicaManager.appendRecords(
         timeout = config.requestTimeoutMs,
-        requiredAcks = TransactionLog.EnforcedRequiredAcks,
+        requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
         internalTopicsAllowed = true,
         origin = AppendOrigin.COORDINATOR,
         entriesPerPartition = 
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
@@ -495,19 +495,21 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not 
be null")
-                TransactionLog.readTxnRecordKey(record.key) match {
-                  case Left(version) =>
-                    warn(s"Unknown message key with version $version" +
-                      s" while loading transaction state from $topicPartition. 
Ignoring it. " +
-                      "It could be a left over from an aborted upgrade.")
-                  case Right(transactionalId) =>
-                    // load transaction metadata along with transaction state
-                    TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
-                      case None =>
-                        loadedTransactions.remove(transactionalId)
-                      case Some(txnMetadata) =>
-                        loadedTransactions.put(transactionalId, txnMetadata)
-                    }
+                val transactionalId = try 
Some(TransactionLog.readTxnRecordKey(record.key))
+                catch {
+                  case e: IllegalStateException =>
+                    warn(s"Unknown message key version while loading 
transaction state from $topicPartition. " +
+                      s"Ignoring it. It could be a left over from an aborted 
upgrade", e)
+                    None
+                }
+                transactionalId.foreach { txnId =>
+                  // load transaction metadata along with transaction state
+                  val txnMetadata = TransactionLog.readTxnRecordValue(txnId, 
record.value)
+                  if (txnMetadata == null) {
+                    loadedTransactions.remove(txnId)
+                  } else {
+                    loadedTransactions.put(txnId, txnMetadata)
+                  }
                 }
               }
               currOffset = batch.nextOffset
@@ -663,7 +665,7 @@ class TransactionStateManager(brokerId: Int,
     val valueBytes = TransactionLog.valueToBytes(newMetadata, 
transactionVersionLevel())
     val timestamp = time.milliseconds()
 
-    val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompression, new 
SimpleRecord(timestamp, keyBytes, valueBytes))
+    val records = 
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new 
SimpleRecord(timestamp, keyBytes, valueBytes))
     val transactionStateTopicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 
partitionFor(transactionalId))
     val transactionStateTopicIdPartition = 
replicaManager.topicIdPartition(transactionStateTopicPartition)
     val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
@@ -806,7 +808,7 @@ class TransactionStateManager(brokerId: Int,
           if (append) {
             replicaManager.appendRecords(
               timeout = newMetadata.txnTimeoutMs.toLong,
-              requiredAcks = TransactionLog.EnforcedRequiredAcks,
+              requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
               internalTopicsAllowed = true,
               origin = AppendOrigin.COORDINATOR,
               entriesPerPartition = recordsPerPartition,
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 79957b01fb7..4bd71172058 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.record.{FileRecords, 
MemoryRecords, RecordBatch,
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
 import org.apache.kafka.common.{Node, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionMetadata, TransactionState}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionLog, TransactionMetadata, TransactionState}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, 
RequestLocal, TransactionVersion}
 import org.apache.kafka.server.storage.log.FetchIsolation
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index f9953bae24f..90a5429fdaa 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata, 
TransactionState, TxnTransitMetadata}
+import org.apache.kafka.coordinator.transaction.{TransactionLog, 
TransactionMetadata, TransactionState, TxnTransitMetadata}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion, 
RequestLocal, TransactionVersion}
 import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
@@ -917,13 +917,10 @@ class TransactionStateManagerTest {
     appendedRecords.values.foreach { batches =>
       batches.foreach { records =>
         records.records.forEach { record =>
-          TransactionLog.readTxnRecordKey(record.key) match {
-            case Right(transactionalId) =>
-              assertNull(record.value)
-              expiredTransactionalIds += transactionalId
-              assertEquals(Right(None), 
transactionManager.getTransactionState(transactionalId))
-            case Left(value) => fail(s"Failed to read transactional id from 
tombstone: $value")
-          }
+          val transactionalId = 
TransactionLog.readTxnRecordKey(record.key).asInstanceOf[String]
+          assertNull(record.value)
+          expiredTransactionalIds += transactionalId
+          assertEquals(Right(None), 
transactionManager.getTransactionState(transactionalId))
         }
       }
     }
@@ -1184,7 +1181,7 @@ class TransactionStateManagerTest {
       val partitionId = transactionManager.partitionFor(transactionalId1)
       val topicPartition = new TopicIdPartition(transactionTopicId, 
partitionId, TRANSACTION_STATE_TOPIC_NAME)
       val expectedTombstone = new SimpleRecord(time.milliseconds(), 
TransactionLog.keyToBytes(transactionalId1), null)
-      val expectedRecords = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompression, expectedTombstone)
+      val expectedRecords = 
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, 
expectedTombstone)
       assertEquals(Set(topicPartition), appendedRecords.keySet)
       assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
     } else {
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
new file mode 100644
index 00000000000..9c86953b5ef
--- /dev/null
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.record.RecordBatch;
+import 
org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import org.apache.kafka.server.common.TransactionVersion;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Messages stored for the transaction topic represent the producer id and 
transactional status of the corresponding
+ * transactional id, which have versions for both the key and value fields. 
Key and value
+ * versions are used to evolve the message formats:
+ *
+ * key version 0:               [transactionalId]
+ *    -> value version 0:       [producer_id, producer_epoch, 
expire_timestamp, status, [topic, [partition] ], timestamp]
+ */
+public class TransactionLog {
+
+    // enforce always using
+    //  1. cleanup policy = compact
+    //  2. compression = none
+    //  3. unclean leader election = disabled
+    //  4. required acks = -1 when writing
+    public static final Compression ENFORCED_COMPRESSION = Compression.NONE;
+    public static final short ENFORCED_REQUIRED_ACKS = (short) -1;
+
+    /**
+     * Generates the bytes for transaction log message key
+     *
+     * @return key bytes
+     */
+    public static byte[] keyToBytes(String transactionalId) {
+        return MessageUtil.toCoordinatorTypePrefixedBytes(
+                new TransactionLogKey().setTransactionalId(transactionalId)
+        );
+    }
+
+    /**
+     * Generates the payload bytes for transaction log message value
+     *
+     * @return value payload bytes
+     */
+    public static byte[] valueToBytes(TxnTransitMetadata txnMetadata,
+                                      TransactionVersion 
transactionVersionLevel) {
+        if (txnMetadata.txnState() == TransactionState.EMPTY && 
!txnMetadata.topicPartitions().isEmpty()) {
+            throw new IllegalStateException("Transaction is not expected to 
have any partitions since its state is "
+                    + txnMetadata.txnState() + ": " + txnMetadata);
+        }
+
+        List<TransactionLogValue.PartitionsSchema> transactionPartitions = 
null;
+
+        if (txnMetadata.txnState() != TransactionState.EMPTY) {
+            transactionPartitions = txnMetadata.topicPartitions().stream()
+                    .collect(Collectors.groupingBy(TopicPartition::topic))
+                    .entrySet().stream()
+                    .map(entry ->
+                        new 
TransactionLogValue.PartitionsSchema().setTopic(entry.getKey())
+                            
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
+        }
+
+        return MessageUtil.toVersionPrefixedBytes(
+                transactionVersionLevel.transactionLogValueVersion(),
+                new TransactionLogValue()
+                        .setProducerId(txnMetadata.producerId())
+                        .setProducerEpoch(txnMetadata.producerEpoch())
+                        .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
+                        .setTransactionStatus(txnMetadata.txnState().id())
+                        
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
+                        
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
+                        .setTransactionPartitions(transactionPartitions)
+                        
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
+        );
+    }
+
+    /**
+     * Decodes the transaction log messages' key
+     *
+     * @return the transactional id
+     * @throws IllegalStateException if the version is not a valid transaction 
log key version
+     */
+    public static String readTxnRecordKey(ByteBuffer buffer) {
+        short version = buffer.getShort();
+        if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) {
+            return new TransactionLogKey(new ByteBufferAccessor(buffer), 
(short) 0).transactionalId();
+        } else {
+            throw new IllegalStateException("Unknown version " + version + " 
from the transaction log message key");
+        }
+    }
+
+    /**
+     * Decodes the transaction log messages' payload and retrieves the 
transaction metadata from it
+     *
+     * @return a transaction metadata object from the message, or null if 
tombstone
+     */
+    public static TransactionMetadata readTxnRecordValue(String 
transactionalId, ByteBuffer buffer) {
+        if (buffer == null) {
+            return null; // tombstone
+        } else {
+            short version = buffer.getShort();
+            if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+                    && version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+
+                TransactionLogValue value = new TransactionLogValue(new 
ByteBufferAccessor(buffer), version);
+                TransactionState state = 
TransactionState.fromId(value.transactionStatus());
+
+                Set<TopicPartition> tps = new HashSet<>();
+                if (state != TransactionState.EMPTY) {
+                    for (TransactionLogValue.PartitionsSchema partitionsSchema 
: value.transactionPartitions()) {
+                        for (int partitionId : 
partitionsSchema.partitionIds()) {
+                            tps.add(new 
TopicPartition(partitionsSchema.topic(), partitionId));
+                        }
+                    }
+                }
+
+                return new TransactionMetadata(
+                        transactionalId,
+                        value.producerId(),
+                        value.previousProducerId(),
+                        value.nextProducerId(),
+                        value.producerEpoch(),
+                        RecordBatch.NO_PRODUCER_EPOCH,
+                        value.transactionTimeoutMs(),
+                        state,
+                        tps,
+                        value.transactionStartTimestampMs(),
+                        value.transactionLastUpdateTimestampMs(),
+                        
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion())
+                );
+            } else {
+                throw new IllegalStateException("Unknown version " + version + 
" from the transaction log message value");
+            }
+        }
+    }
+}
diff --git 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
index 3295901a8ba..50d0f0327f0 100644
--- 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
+++ 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.coordinator.transaction;
 
-import kafka.coordinator.transaction.TransactionLog;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -52,8 +50,9 @@ import static 
org.apache.kafka.server.common.TransactionVersion.LATEST_PRODUCTIO
 import static org.apache.kafka.server.common.TransactionVersion.TV_0;
 import static org.apache.kafka.server.common.TransactionVersion.TV_2;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 class TransactionLogTest {
@@ -67,15 +66,12 @@ class TransactionLogTest {
     );
 
     private sealed interface TxnKeyResult {
-        record UnknownVersion(short version) implements TxnKeyResult { }
         record TransactionalId(String id) implements TxnKeyResult { }
     }
 
     private static TxnKeyResult readTxnRecordKey(ByteBuffer buf) {
-        var e = TransactionLog.readTxnRecordKey(buf);
-        return e.isLeft()
-            ? new TxnKeyResult.UnknownVersion((Short) e.left().get())
-            : new TxnKeyResult.TransactionalId(e.right().get());
+        var result = TransactionLog.readTxnRecordKey(buf);
+        return new TxnKeyResult.TransactionalId(result);
     }
 
     private static TransactionMetadata TransactionMetadata(TransactionState 
state) {
@@ -128,7 +124,7 @@ class TransactionLogTest {
             TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), TV_2)
         )).records().iterator().next();
         var txnIdResult = assertInstanceOf(TxnKeyResult.TransactionalId.class, 
readTxnRecordKey(record.key()));
-        var deserialized = TransactionLog.readTxnRecordValue(txnIdResult.id(), 
record.value()).get();
+        var deserialized = TransactionLog.readTxnRecordValue(txnIdResult.id(), 
record.value());
 
         assertEquals(txnMetadata.producerId(), deserialized.producerId());
         assertEquals(txnMetadata.producerEpoch(), 
deserialized.producerEpoch());
@@ -172,7 +168,7 @@ class TransactionLogTest {
             .setTransactionPartitions(List.of(txnPartitions));
 
         var serialized = MessageUtil.toVersionPrefixedByteBuffer((short) 1, 
txnLogValue);
-        var deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized).get();
+        var deserialized = TransactionLog.readTxnRecordValue("transactionId", 
serialized);
 
         assertEquals(100, deserialized.producerId());
         assertEquals(50, deserialized.producerEpoch());
@@ -253,11 +249,9 @@ class TransactionLogTest {
 
         // Read the buffer with readTxnRecordValue.
         buffer.rewind();
-        var txnMetadata = TransactionLog.readTxnRecordValue("transaction-id", 
buffer);
-        
-        assertFalse(txnMetadata.isEmpty(), "Expected transaction metadata but 
got none");
+        var metadata = TransactionLog.readTxnRecordValue("transaction-id", 
buffer);
 
-        var metadata = txnMetadata.get();
+        assertNotNull(metadata, "Expected transaction metadata but got none");
         assertEquals(1000L, metadata.producerId());
         assertEquals(100, metadata.producerEpoch());
         assertEquals(1000, metadata.txnTimeoutMs());
@@ -268,16 +262,15 @@ class TransactionLogTest {
     }
 
     @Test
-    void testReadTxnRecordKeyCanReadUnknownMessage() {
+    void testReadTxnRecordKeyThrowsOnUnknownVersion() {
         var unknownRecord = 
MessageUtil.toVersionPrefixedBytes(Short.MAX_VALUE, new TransactionLogKey());
-        var result = readTxnRecordKey(wrap(unknownRecord));
-        
-        var uv = assertInstanceOf(TxnKeyResult.UnknownVersion.class, result);
-        assertEquals(Short.MAX_VALUE, uv.version());
+        var exception = assertThrows(IllegalStateException.class,
+            () -> TransactionLog.readTxnRecordKey(wrap(unknownRecord)));
+        assertTrue(exception.getMessage().contains("Unknown version " + 
Short.MAX_VALUE));
     }
    
     @Test
-    void shouldReturnEmptyWhenForTombstoneRecord() {
-        assertTrue(TransactionLog.readTxnRecordValue("transaction-id", 
null).isEmpty());
+    void shouldReturnNullForTombstoneRecord() {
+        assertNull(TransactionLog.readTxnRecordValue("transaction-id", null));
     }
 }
\ No newline at end of file

Reply via email to