This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5bbc421a13e MINOR: update TransactionLog#readTxnRecordValue to 
initialize TransactionMetadata with non-empty topic partitions (#20370)
5bbc421a13e is described below

commit 5bbc421a13ee159bbff1cfabd012988061fc0c47
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Tue Aug 26 10:36:45 2025 +0800

    MINOR: update TransactionLog#readTxnRecordValue to initialize 
TransactionMetadata with non-empty topic partitions (#20370)
    
    This is followup PR for https://github.com/apache/kafka/pull/19699.
    
    * Update TransactionLog#readTxnRecordValue to initialize
    TransactionMetadata with non-empty topic partitions
    * Update `TxnTransitMetadata` comment, because it's not immutable.
    
    Reviewers: TengYao Chi <kiting...@gmail.com>, Justine Olshan
     <jols...@confluent.io>, Kuan-Po Tseng <brandb...@gmail.com>, Chia-Ping
     Tsai <chia7...@gmail.com>
---
 .../coordinator/transaction/TransactionLog.scala   | 23 ++++++++++------------
 .../transaction/TransactionMetadata.java           |  5 +++++
 .../transaction/TxnTransitMetadata.java            |  2 +-
 3 files changed, 16 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 75baa98da15..a3e9eacb66f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -115,7 +115,13 @@ object TransactionLog {
       val version = buffer.getShort
       if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version 
<= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
         val value = new TransactionLogValue(new ByteBufferAccessor(buffer), 
version)
-        val transactionMetadata = new TransactionMetadata(
+        val state = TransactionState.fromId(value.transactionStatus)
+        val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
+        if (!state.equals(TransactionState.EMPTY))
+          value.transactionPartitions.forEach(partitionsSchema => {
+            partitionsSchema.partitionIds.forEach(partitionId => tps.add(new 
TopicPartition(partitionsSchema.topic, partitionId.intValue())))
+          })
+        Some(new TransactionMetadata(
           transactionalId,
           value.producerId,
           value.previousProducerId,
@@ -123,20 +129,11 @@ object TransactionLog {
           value.producerEpoch,
           RecordBatch.NO_PRODUCER_EPOCH,
           value.transactionTimeoutMs,
-          TransactionState.fromId(value.transactionStatus),
-          util.Set.of(),
+          state,
+          tps,
           value.transactionStartTimestampMs,
           value.transactionLastUpdateTimestampMs,
-          TransactionVersion.fromFeatureLevel(value.clientTransactionVersion))
-
-        if (!transactionMetadata.state.equals(TransactionState.EMPTY))
-          value.transactionPartitions.forEach(partitionsSchema => {
-            transactionMetadata.addPartitions(partitionsSchema.partitionIds
-              .stream
-              .map(partitionId => new TopicPartition(partitionsSchema.topic, 
partitionId.intValue()))
-              .toList)
-          })
-        Some(transactionMetadata)
+          TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
       } else throw new IllegalStateException(s"Unknown version $version from 
the transaction log message value")
     }
   }
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
index 96a92dd01c9..8efeedc3ec4 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
@@ -117,6 +117,7 @@ public class TransactionMetadata {
         }
     }
 
+    // VisibleForTesting
     public void addPartitions(Collection<TopicPartition> partitions) {
         topicPartitions.addAll(partitions);
     }
@@ -500,6 +501,7 @@ public class TransactionMetadata {
         return transactionalId;
     }
 
+    // VisibleForTesting
     public void setProducerId(long producerId) {
         this.producerId = producerId;
     }
@@ -507,6 +509,7 @@ public class TransactionMetadata {
         return producerId;
     }
 
+    // VisibleForTesting
     public void setPrevProducerId(long prevProducerId) {
         this.prevProducerId = prevProducerId;
     }
@@ -534,6 +537,7 @@ public class TransactionMetadata {
         return txnTimeoutMs;
     }
 
+    // VisibleForTesting
     public void state(TransactionState state) {
         this.state = state;
     }
@@ -550,6 +554,7 @@ public class TransactionMetadata {
         return txnStartTimestamp;
     }
 
+    // VisibleForTesting
     public void txnLastUpdateTimestamp(long txnLastUpdateTimestamp) {
         this.txnLastUpdateTimestamp = txnLastUpdateTimestamp;
     }
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
index 452c168687e..b2e78d45da3 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java
@@ -22,7 +22,7 @@ import org.apache.kafka.server.common.TransactionVersion;
 import java.util.HashSet;
 
 /**
- * Immutable object representing the target transition of the transaction 
metadata
+ * Represent the target transition of the transaction metadata. The 
topicPartitions field is mutable.
  */
 public record TxnTransitMetadata(
         long producerId,

Reply via email to