This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5bbc421a13e MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions (#20370) 5bbc421a13e is described below commit 5bbc421a13ee159bbff1cfabd012988061fc0c47 Author: PoAn Yang <pay...@apache.org> AuthorDate: Tue Aug 26 10:36:45 2025 +0800 MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions (#20370) This is followup PR for https://github.com/apache/kafka/pull/19699. * Update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions * Update `TxnTransitMetadata` comment, because it's not immutable. Reviewers: TengYao Chi <kiting...@gmail.com>, Justine Olshan <jols...@confluent.io>, Kuan-Po Tseng <brandb...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../coordinator/transaction/TransactionLog.scala | 23 ++++++++++------------ .../transaction/TransactionMetadata.java | 5 +++++ .../transaction/TxnTransitMetadata.java | 2 +- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index 75baa98da15..a3e9eacb66f 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -115,7 +115,13 @@ object TransactionLog { val version = buffer.getShort if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { val value = new TransactionLogValue(new ByteBufferAccessor(buffer), version) - val transactionMetadata = new TransactionMetadata( + val state = TransactionState.fromId(value.transactionStatus) + val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]() + if (!state.equals(TransactionState.EMPTY)) + value.transactionPartitions.forEach(partitionsSchema => { + partitionsSchema.partitionIds.forEach(partitionId => tps.add(new TopicPartition(partitionsSchema.topic, partitionId.intValue()))) + }) + Some(new TransactionMetadata( transactionalId, value.producerId, value.previousProducerId, @@ -123,20 +129,11 @@ object TransactionLog { value.producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, value.transactionTimeoutMs, - TransactionState.fromId(value.transactionStatus), - util.Set.of(), + state, + tps, value.transactionStartTimestampMs, value.transactionLastUpdateTimestampMs, - TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)) - - if (!transactionMetadata.state.equals(TransactionState.EMPTY)) - value.transactionPartitions.forEach(partitionsSchema => { - transactionMetadata.addPartitions(partitionsSchema.partitionIds - .stream - .map(partitionId => new TopicPartition(partitionsSchema.topic, partitionId.intValue())) - .toList) - }) - Some(transactionMetadata) + TransactionVersion.fromFeatureLevel(value.clientTransactionVersion))) } else throw new IllegalStateException(s"Unknown version $version from the transaction log message value") } } diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java index 96a92dd01c9..8efeedc3ec4 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java @@ -117,6 +117,7 @@ public class TransactionMetadata { } } + // VisibleForTesting public void addPartitions(Collection<TopicPartition> partitions) { topicPartitions.addAll(partitions); } @@ -500,6 +501,7 @@ public class TransactionMetadata { return transactionalId; } + // VisibleForTesting public void setProducerId(long producerId) { this.producerId = producerId; } @@ -507,6 +509,7 @@ public class TransactionMetadata { return producerId; } + // VisibleForTesting public void setPrevProducerId(long prevProducerId) { this.prevProducerId = prevProducerId; } @@ -534,6 +537,7 @@ public class TransactionMetadata { return txnTimeoutMs; } + // VisibleForTesting public void state(TransactionState state) { this.state = state; } @@ -550,6 +554,7 @@ public class TransactionMetadata { return txnStartTimestamp; } + // VisibleForTesting public void txnLastUpdateTimestamp(long txnLastUpdateTimestamp) { this.txnLastUpdateTimestamp = txnLastUpdateTimestamp; } diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java index 452c168687e..b2e78d45da3 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java @@ -22,7 +22,7 @@ import org.apache.kafka.server.common.TransactionVersion; import java.util.HashSet; /** - * Immutable object representing the target transition of the transaction metadata + * Represent the target transition of the transaction metadata. The topicPartitions field is mutable. */ public record TxnTransitMetadata( long producerId,