This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d81de035e0 KAFKA-18883 Move TransactionLog to transaction-coordinator
module (#20456)
4d81de035e0 is described below
commit 4d81de035e07499eac48fe0bbc12af73efc885a6
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Jan 5 15:47:42 2026 +0800
KAFKA-18883 Move TransactionLog to transaction-coordinator module (#20456)
Ported kafka.coordinator.transaction.TransactionLog (Scala) to
org.apache.kafka.coordinator.transaction.TransactionLog (Java).
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../coordinator/transaction/TransactionLog.scala | 140 ------------------
.../transaction/TransactionStateManager.scala | 38 ++---
.../TransactionCoordinatorConcurrencyTest.scala | 2 +-
.../transaction/TransactionStateManagerTest.scala | 15 +-
.../coordinator/transaction/TransactionLog.java | 160 +++++++++++++++++++++
.../transaction/TransactionLogTest.java | 35 ++---
6 files changed, 201 insertions(+), 189 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
deleted file mode 100644
index f024e88aa8e..00000000000
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.transaction
-
-import java.nio.ByteBuffer
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
-import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata,
TransactionState, TxnTransitMetadata}
-import
org.apache.kafka.coordinator.transaction.generated.{CoordinatorRecordType,
TransactionLogKey, TransactionLogValue}
-import org.apache.kafka.server.common.TransactionVersion
-
-import java.util
-
-import scala.jdk.CollectionConverters._
-
-/**
- * Messages stored for the transaction topic represent the producer id and
transactional status of the corresponding
- * transactional id, which have versions for both the key and value fields.
Key and value
- * versions are used to evolve the message formats:
- *
- * key version 0: [transactionalId]
- * -> value version 0: [producer_id, producer_epoch,
expire_timestamp, status, [topic, [partition] ], timestamp]
- */
-object TransactionLog {
-
- // enforce always using
- // 1. cleanup policy = compact
- // 2. compression = none
- // 3. unclean leader election = disabled
- // 4. required acks = -1 when writing
- val EnforcedCompression: Compression = Compression.NONE
- val EnforcedRequiredAcks: Short = (-1).toShort
-
- /**
- * Generates the bytes for transaction log message key
- *
- * @return key bytes
- */
- def keyToBytes(transactionalId: String): Array[Byte] = {
- MessageUtil.toCoordinatorTypePrefixedBytes(new
TransactionLogKey().setTransactionalId(transactionalId))
- }
-
- /**
- * Generates the payload bytes for transaction log message value
- *
- * @return value payload bytes
- */
- def valueToBytes(txnMetadata: TxnTransitMetadata,
- transactionVersionLevel:
TransactionVersion): Array[Byte] = {
- if (txnMetadata.txnState == TransactionState.EMPTY &&
!txnMetadata.topicPartitions.isEmpty)
- throw new IllegalStateException(s"Transaction is not expected to have
any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
-
- val transactionPartitions = if (txnMetadata.txnState ==
TransactionState.EMPTY) null
- else txnMetadata.topicPartitions.asScala
- .groupBy(_.topic)
- .map { case (topic, partitions) =>
- new TransactionLogValue.PartitionsSchema()
- .setTopic(topic)
- .setPartitionIds(partitions.map(tp =>
Integer.valueOf(tp.partition)).toList.asJava)
- }.toList.asJava
-
- // Serialize with version 0 (highest non-flexible version) until
transaction.version 1 is enabled
- // which enables flexible fields in records.
-
MessageUtil.toVersionPrefixedBytes(transactionVersionLevel.transactionLogValueVersion(),
- new TransactionLogValue()
- .setProducerId(txnMetadata.producerId)
- .setProducerEpoch(txnMetadata.producerEpoch)
- .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs)
- .setTransactionStatus(txnMetadata.txnState.id)
-
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp)
- .setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp)
- .setTransactionPartitions(transactionPartitions)
-
.setClientTransactionVersion(txnMetadata.clientTransactionVersion.featureLevel()))
- }
-
- /**
- * Decodes the transaction log messages' key
- *
- * @return left with the version if the key is not a transaction log key,
right with the transactional id otherwise
- */
- def readTxnRecordKey(buffer: ByteBuffer): Either[Short, String] = {
- val version = buffer.getShort
- Either.cond(
- version == CoordinatorRecordType.TRANSACTION_LOG.id,
- new TransactionLogKey(new ByteBufferAccessor(buffer),
0.toShort).transactionalId,
- version
- )
- }
-
- /**
- * Decodes the transaction log messages' payload and retrieves the
transaction metadata from it
- *
- * @return a transaction metadata object from the message
- */
- def readTxnRecordValue(transactionalId: String, buffer: ByteBuffer):
Option[TransactionMetadata] = {
- // tombstone
- if (buffer == null) None
- else {
- val version = buffer.getShort
- if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version
<= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
- val value = new TransactionLogValue(new ByteBufferAccessor(buffer),
version)
- val state = TransactionState.fromId(value.transactionStatus)
- val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
- if (!state.equals(TransactionState.EMPTY))
- value.transactionPartitions.forEach(partitionsSchema => {
- partitionsSchema.partitionIds.forEach(partitionId => tps.add(new
TopicPartition(partitionsSchema.topic, partitionId.intValue())))
- })
- Some(new TransactionMetadata(
- transactionalId,
- value.producerId,
- value.previousProducerId,
- value.nextProducerId,
- value.producerEpoch,
- RecordBatch.NO_PRODUCER_EPOCH,
- value.transactionTimeoutMs,
- state,
- tps,
- value.transactionStartTimestampMs,
- value.transactionLastUpdateTimestampMs,
- TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
- } else throw new IllegalStateException(s"Unknown version $version from
the transaction log message value")
- }
- }
-}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 82b960c5ba7..800ac310a13 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -35,7 +35,7 @@ import
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicIdPartition,
TopicPartition}
-import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionMetadata, TransactionState, TransactionStateManagerConfig,
TxnTransitMetadata}
+import org.apache.kafka.coordinator.transaction.{TransactionLog,
TransactionLogConfig, TransactionMetadata, TransactionState,
TransactionStateManagerConfig, TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
import org.apache.kafka.server.config.ServerConfigs
@@ -183,7 +183,7 @@ class TransactionStateManager(brokerId: Int,
if (recordsBuilder == null) {
recordsBuilder = MemoryRecords.builder(
ByteBuffer.allocate(math.min(16384, maxBatchSize)),
- TransactionLog.EnforcedCompression,
+ TransactionLog.ENFORCED_COMPRESSION,
TimestampType.CREATE_TIME,
0L,
maxBatchSize
@@ -290,7 +290,7 @@ class TransactionStateManager(brokerId: Int,
inReadLock(stateLock) {
replicaManager.appendRecords(
timeout = config.requestTimeoutMs,
- requiredAcks = TransactionLog.EnforcedRequiredAcks,
+ requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition =
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
@@ -495,19 +495,21 @@ class TransactionStateManager(brokerId: Int,
memRecords.batches.forEach { batch =>
for (record <- batch.asScala) {
require(record.hasKey, "Transaction state log's key should not
be null")
- TransactionLog.readTxnRecordKey(record.key) match {
- case Left(version) =>
- warn(s"Unknown message key with version $version" +
- s" while loading transaction state from $topicPartition.
Ignoring it. " +
- "It could be a left over from an aborted upgrade.")
- case Right(transactionalId) =>
- // load transaction metadata along with transaction state
- TransactionLog.readTxnRecordValue(transactionalId,
record.value) match {
- case None =>
- loadedTransactions.remove(transactionalId)
- case Some(txnMetadata) =>
- loadedTransactions.put(transactionalId, txnMetadata)
- }
+ val transactionalId = try
Some(TransactionLog.readTxnRecordKey(record.key))
+ catch {
+ case e: IllegalStateException =>
+ warn(s"Unknown message key version while loading
transaction state from $topicPartition. " +
+ s"Ignoring it. It could be a left over from an aborted
upgrade", e)
+ None
+ }
+ transactionalId.foreach { txnId =>
+ // load transaction metadata along with transaction state
+ val txnMetadata = TransactionLog.readTxnRecordValue(txnId,
record.value)
+ if (txnMetadata == null) {
+ loadedTransactions.remove(txnId)
+ } else {
+ loadedTransactions.put(txnId, txnMetadata)
+ }
}
}
currOffset = batch.nextOffset
@@ -663,7 +665,7 @@ class TransactionStateManager(brokerId: Int,
val valueBytes = TransactionLog.valueToBytes(newMetadata,
transactionVersionLevel())
val timestamp = time.milliseconds()
- val records =
MemoryRecords.withRecords(TransactionLog.EnforcedCompression, new
SimpleRecord(timestamp, keyBytes, valueBytes))
+ val records =
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new
SimpleRecord(timestamp, keyBytes, valueBytes))
val transactionStateTopicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME,
partitionFor(transactionalId))
val transactionStateTopicIdPartition =
replicaManager.topicIdPartition(transactionStateTopicPartition)
val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
@@ -806,7 +808,7 @@ class TransactionStateManager(brokerId: Int,
if (append) {
replicaManager.appendRecords(
timeout = newMetadata.txnTimeoutMs.toLong,
- requiredAcks = TransactionLog.EnforcedRequiredAcks,
+ requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = recordsPerPartition,
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 79957b01fb7..4bd71172058 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.record.{FileRecords,
MemoryRecords, RecordBatch,
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, MockTime, ProducerIdAndEpoch}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
-import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionMetadata, TransactionState}
+import org.apache.kafka.coordinator.transaction.{ProducerIdManager,
TransactionLog, TransactionMetadata, TransactionState}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion,
RequestLocal, TransactionVersion}
import org.apache.kafka.server.storage.log.FetchIsolation
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index f9953bae24f..90a5429fdaa 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.coordinator.transaction.{TransactionMetadata,
TransactionState, TxnTransitMetadata}
+import org.apache.kafka.coordinator.transaction.{TransactionLog,
TransactionMetadata, TransactionState, TxnTransitMetadata}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion,
RequestLocal, TransactionVersion}
import org.apache.kafka.server.common.TransactionVersion.{TV_0, TV_2}
@@ -917,13 +917,10 @@ class TransactionStateManagerTest {
appendedRecords.values.foreach { batches =>
batches.foreach { records =>
records.records.forEach { record =>
- TransactionLog.readTxnRecordKey(record.key) match {
- case Right(transactionalId) =>
- assertNull(record.value)
- expiredTransactionalIds += transactionalId
- assertEquals(Right(None),
transactionManager.getTransactionState(transactionalId))
- case Left(value) => fail(s"Failed to read transactional id from
tombstone: $value")
- }
+ val transactionalId =
TransactionLog.readTxnRecordKey(record.key).asInstanceOf[String]
+ assertNull(record.value)
+ expiredTransactionalIds += transactionalId
+ assertEquals(Right(None),
transactionManager.getTransactionState(transactionalId))
}
}
}
@@ -1184,7 +1181,7 @@ class TransactionStateManagerTest {
val partitionId = transactionManager.partitionFor(transactionalId1)
val topicPartition = new TopicIdPartition(transactionTopicId,
partitionId, TRANSACTION_STATE_TOPIC_NAME)
val expectedTombstone = new SimpleRecord(time.milliseconds(),
TransactionLog.keyToBytes(transactionalId1), null)
- val expectedRecords =
MemoryRecords.withRecords(TransactionLog.EnforcedCompression, expectedTombstone)
+ val expectedRecords =
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION,
expectedTombstone)
assertEquals(Set(topicPartition), appendedRecords.keySet)
assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
} else {
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
new file mode 100644
index 00000000000..9c86953b5ef
--- /dev/null
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.record.RecordBatch;
+import
org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import org.apache.kafka.server.common.TransactionVersion;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Messages stored for the transaction topic represent the producer id and
transactional status of the corresponding
+ * transactional id, which have versions for both the key and value fields.
Key and value
+ * versions are used to evolve the message formats:
+ *
+ * key version 0: [transactionalId]
+ * -> value version 0: [producer_id, producer_epoch,
expire_timestamp, status, [topic, [partition] ], timestamp]
+ */
+public class TransactionLog {
+
+ // enforce always using
+ // 1. cleanup policy = compact
+ // 2. compression = none
+ // 3. unclean leader election = disabled
+ // 4. required acks = -1 when writing
+ public static final Compression ENFORCED_COMPRESSION = Compression.NONE;
+ public static final short ENFORCED_REQUIRED_ACKS = (short) -1;
+
+ /**
+ * Generates the bytes for transaction log message key
+ *
+ * @return key bytes
+ */
+ public static byte[] keyToBytes(String transactionalId) {
+ return MessageUtil.toCoordinatorTypePrefixedBytes(
+ new TransactionLogKey().setTransactionalId(transactionalId)
+ );
+ }
+
+ /**
+ * Generates the payload bytes for transaction log message value
+ *
+ * @return value payload bytes
+ */
+ public static byte[] valueToBytes(TxnTransitMetadata txnMetadata,
+ TransactionVersion
transactionVersionLevel) {
+ if (txnMetadata.txnState() == TransactionState.EMPTY &&
!txnMetadata.topicPartitions().isEmpty()) {
+ throw new IllegalStateException("Transaction is not expected to
have any partitions since its state is "
+ + txnMetadata.txnState() + ": " + txnMetadata);
+ }
+
+ List<TransactionLogValue.PartitionsSchema> transactionPartitions =
null;
+
+ if (txnMetadata.txnState() != TransactionState.EMPTY) {
+ transactionPartitions = txnMetadata.topicPartitions().stream()
+ .collect(Collectors.groupingBy(TopicPartition::topic))
+ .entrySet().stream()
+ .map(entry ->
+ new
TransactionLogValue.PartitionsSchema().setTopic(entry.getKey())
+
.setPartitionIds(entry.getValue().stream().map(TopicPartition::partition).toList())).toList();
+ }
+
+ return MessageUtil.toVersionPrefixedBytes(
+ transactionVersionLevel.transactionLogValueVersion(),
+ new TransactionLogValue()
+ .setProducerId(txnMetadata.producerId())
+ .setProducerEpoch(txnMetadata.producerEpoch())
+ .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs())
+ .setTransactionStatus(txnMetadata.txnState().id())
+
.setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp())
+
.setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp())
+ .setTransactionPartitions(transactionPartitions)
+
.setClientTransactionVersion(txnMetadata.clientTransactionVersion().featureLevel())
+ );
+ }
+
+ /**
+ * Decodes the transaction log messages' key
+ *
+ * @return the transactional id
+ * @throws IllegalStateException if the version is not a valid transaction
log key version
+ */
+ public static String readTxnRecordKey(ByteBuffer buffer) {
+ short version = buffer.getShort();
+ if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) {
+ return new TransactionLogKey(new ByteBufferAccessor(buffer),
(short) 0).transactionalId();
+ } else {
+ throw new IllegalStateException("Unknown version " + version + "
from the transaction log message key");
+ }
+ }
+
+ /**
+ * Decodes the transaction log messages' payload and retrieves the
transaction metadata from it
+ *
+ * @return a transaction metadata object from the message, or null if
tombstone
+ */
+ public static TransactionMetadata readTxnRecordValue(String
transactionalId, ByteBuffer buffer) {
+ if (buffer == null) {
+ return null; // tombstone
+ } else {
+ short version = buffer.getShort();
+ if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+ && version <=
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+
+ TransactionLogValue value = new TransactionLogValue(new
ByteBufferAccessor(buffer), version);
+ TransactionState state =
TransactionState.fromId(value.transactionStatus());
+
+ Set<TopicPartition> tps = new HashSet<>();
+ if (state != TransactionState.EMPTY) {
+ for (TransactionLogValue.PartitionsSchema partitionsSchema
: value.transactionPartitions()) {
+ for (int partitionId :
partitionsSchema.partitionIds()) {
+ tps.add(new
TopicPartition(partitionsSchema.topic(), partitionId));
+ }
+ }
+ }
+
+ return new TransactionMetadata(
+ transactionalId,
+ value.producerId(),
+ value.previousProducerId(),
+ value.nextProducerId(),
+ value.producerEpoch(),
+ RecordBatch.NO_PRODUCER_EPOCH,
+ value.transactionTimeoutMs(),
+ state,
+ tps,
+ value.transactionStartTimestampMs(),
+ value.transactionLastUpdateTimestampMs(),
+
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion())
+ );
+ } else {
+ throw new IllegalStateException("Unknown version " + version +
" from the transaction log message value");
+ }
+ }
+ }
+}
diff --git
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
index 3295901a8ba..50d0f0327f0 100644
---
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
+++
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.coordinator.transaction;
-import kafka.coordinator.transaction.TransactionLog;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
@@ -52,8 +50,9 @@ import static
org.apache.kafka.server.common.TransactionVersion.LATEST_PRODUCTIO
import static org.apache.kafka.server.common.TransactionVersion.TV_0;
import static org.apache.kafka.server.common.TransactionVersion.TV_2;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class TransactionLogTest {
@@ -67,15 +66,12 @@ class TransactionLogTest {
);
private sealed interface TxnKeyResult {
- record UnknownVersion(short version) implements TxnKeyResult { }
record TransactionalId(String id) implements TxnKeyResult { }
}
private static TxnKeyResult readTxnRecordKey(ByteBuffer buf) {
- var e = TransactionLog.readTxnRecordKey(buf);
- return e.isLeft()
- ? new TxnKeyResult.UnknownVersion((Short) e.left().get())
- : new TxnKeyResult.TransactionalId(e.right().get());
+ var result = TransactionLog.readTxnRecordKey(buf);
+ return new TxnKeyResult.TransactionalId(result);
}
private static TransactionMetadata TransactionMetadata(TransactionState
state) {
@@ -128,7 +124,7 @@ class TransactionLogTest {
TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), TV_2)
)).records().iterator().next();
var txnIdResult = assertInstanceOf(TxnKeyResult.TransactionalId.class,
readTxnRecordKey(record.key()));
- var deserialized = TransactionLog.readTxnRecordValue(txnIdResult.id(),
record.value()).get();
+ var deserialized = TransactionLog.readTxnRecordValue(txnIdResult.id(),
record.value());
assertEquals(txnMetadata.producerId(), deserialized.producerId());
assertEquals(txnMetadata.producerEpoch(),
deserialized.producerEpoch());
@@ -172,7 +168,7 @@ class TransactionLogTest {
.setTransactionPartitions(List.of(txnPartitions));
var serialized = MessageUtil.toVersionPrefixedByteBuffer((short) 1,
txnLogValue);
- var deserialized = TransactionLog.readTxnRecordValue("transactionId",
serialized).get();
+ var deserialized = TransactionLog.readTxnRecordValue("transactionId",
serialized);
assertEquals(100, deserialized.producerId());
assertEquals(50, deserialized.producerEpoch());
@@ -253,11 +249,9 @@ class TransactionLogTest {
// Read the buffer with readTxnRecordValue.
buffer.rewind();
- var txnMetadata = TransactionLog.readTxnRecordValue("transaction-id",
buffer);
-
- assertFalse(txnMetadata.isEmpty(), "Expected transaction metadata but
got none");
+ var metadata = TransactionLog.readTxnRecordValue("transaction-id",
buffer);
- var metadata = txnMetadata.get();
+ assertNotNull(metadata, "Expected transaction metadata but got none");
assertEquals(1000L, metadata.producerId());
assertEquals(100, metadata.producerEpoch());
assertEquals(1000, metadata.txnTimeoutMs());
@@ -268,16 +262,15 @@ class TransactionLogTest {
}
@Test
- void testReadTxnRecordKeyCanReadUnknownMessage() {
+ void testReadTxnRecordKeyThrowsOnUnknownVersion() {
var unknownRecord =
MessageUtil.toVersionPrefixedBytes(Short.MAX_VALUE, new TransactionLogKey());
- var result = readTxnRecordKey(wrap(unknownRecord));
-
- var uv = assertInstanceOf(TxnKeyResult.UnknownVersion.class, result);
- assertEquals(Short.MAX_VALUE, uv.version());
+ var exception = assertThrows(IllegalStateException.class,
+ () -> TransactionLog.readTxnRecordKey(wrap(unknownRecord)));
+ assertTrue(exception.getMessage().contains("Unknown version " +
Short.MAX_VALUE));
}
@Test
- void shouldReturnEmptyWhenForTombstoneRecord() {
- assertTrue(TransactionLog.readTxnRecordValue("transaction-id",
null).isEmpty());
+ void shouldReturnNullForTombstoneRecord() {
+ assertNull(TransactionLog.readTxnRecordValue("transaction-id", null));
}
}
\ No newline at end of file