This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a81878fc0b KAFKA-18209 Clean up transaction state topic config logic 
(#22456)
8a81878fc0b is described below

commit 8a81878fc0b32025277086833de155227520c6c6
Author: majialong <[email protected]>
AuthorDate: Sat Jun 6 23:54:17 2026 +0800

    KAFKA-18209 Clean up transaction state topic config logic (#22456)
    
    This PR cleans up the `__transaction_state` topic config logic by
    removing scattered configuration related constants from `TransactionLog`
    and keeping it focused on serialization/deserialization.
    
    It also moves transaction state topic config construction to
    `TransactionCoordinator` and renames `transactionTopicConfigs` to
    `transactionStateTopicConfigs` for clarity, aligning with the current
    group/share coordinator pattern where coordinators expose internal topic
    configs to the broker.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../transaction/TransactionCoordinator.scala       | 18 ++++++++++++++-
 .../transaction/TransactionStateManager.scala      | 25 +++++----------------
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../transaction/TransactionStateManagerTest.scala  |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 ++--
 .../coordinator/transaction/TransactionLog.java    | 26 ----------------------
 7 files changed, 27 insertions(+), 52 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 43455a83e41..70db45b1eba 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -19,6 +19,7 @@ package kafka.coordinator.transaction
 import kafka.server.{KafkaConfig, ReplicaManager}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
 import org.apache.kafka.common.message.{DescribeTransactionsResponseData, 
ListTransactionsResponseData}
@@ -31,6 +32,7 @@ import org.apache.kafka.common.utils.internals.LogContext
 import org.apache.kafka.coordinator.transaction.{ProducerIdManager, 
TransactionConfig, TransactionLogConfig, TransactionMetadata, TransactionState, 
TransactionStateManagerConfig, TransactionalIdAndProducerIdEpoch, 
TxnTransitMetadata}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
+import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Scheduler
 
 import java.util
@@ -39,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.jdk.OptionConverters._
 
 object TransactionCoordinator {
+  val EnforcedRequiredAcks: Short = -1.toShort
 
   def apply(config: KafkaConfig,
             replicaManager: ReplicaManager,
@@ -1004,7 +1007,20 @@ class TransactionCoordinator(txnConfig: 
TransactionConfig,
     }
   }
 
-  def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
+  /**
+   * Return the configuration properties of the transaction state topic.
+   *
+   * @return Properties of the transaction state topic.
+   */
+  def transactionStateTopicConfigs: Properties = {
+    val props = new Properties
+    props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
+    props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.UNCOMPRESSED.name)
+    props.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
+    props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
txnConfig.transactionLogMinInsyncReplicas.toString)
+    props.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
txnConfig.transactionLogSegmentBytes.toString)
+    props
+  }
 
   def partitionFor(transactionalId: String): Int = 
txnManager.partitionFor(transactionalId)
 
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 425789a8263..99894ce759a 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -17,13 +17,12 @@
 package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
-import java.util.Properties
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import kafka.server.ReplicaManager
 import kafka.utils.Logging
-import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.ListTransactionsResponseData
 import org.apache.kafka.common.metrics.Metrics
@@ -38,7 +37,6 @@ import org.apache.kafka.common.{KafkaException, 
TopicIdPartition, TopicPartition
 import 
org.apache.kafka.coordinator.transaction.{CoordinatorEpochAndTxnMetadata, 
TransactionConfig, TransactionLog, TransactionMetadata, TransactionState, 
TransactionStateManagerConfig, TransactionPartitionAndLeaderEpoch, 
TransactionalIdAndProducerIdEpoch, TransactionalIdCoordinatorEpochAndMetadata, 
TransactionalIdCoordinatorEpochAndTransitMetadata, TxnMetadataCacheEntry, 
TxnTransitMetadata}
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.storage.log.FetchIsolation
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
@@ -183,7 +181,7 @@ class TransactionStateManager(brokerId: Int,
                 if (recordsBuilder == null) {
                   recordsBuilder = MemoryRecords.builder(
                     ByteBuffer.allocate(math.min(16384, maxBatchSize)),
-                    TransactionLog.ENFORCED_COMPRESSION,
+                    Compression.NONE,
                     TimestampType.CREATE_TIME,
                     0L,
                     maxBatchSize
@@ -290,7 +288,7 @@ class TransactionStateManager(brokerId: Int,
     inReadLock[Exception](stateLock, () => {
       replicaManager.appendRecords(
         timeout = config.requestTimeoutMs,
-        requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
+        requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
         internalTopicsAllowed = true,
         origin = AppendOrigin.COORDINATOR,
         entriesPerPartition = 
Map(replicaManager.topicIdPartition(transactionPartition) -> tombstoneRecords),
@@ -438,19 +436,6 @@ class TransactionStateManager(brokerId: Int,
     enableTwoPC || (txnTimeoutMs <= config.transactionMaxTimeoutMs && 
txnTimeoutMs > 0)
   }
 
-  def transactionTopicConfigs: Properties = {
-    val props = new Properties
-
-    // enforce disabled unclean leader election, no compression types, and 
compact cleanup policy
-    props.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false")
-    props.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.UNCOMPRESSED.name)
-    props.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
-    props.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
config.transactionLogMinInsyncReplicas.toString)
-    props.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
config.transactionLogSegmentBytes.toString)
-
-    props
-  }
-
   def partitionFor(transactionalId: String): Int = 
Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
 
   private def loadTransactionMetadata(topicPartition: TopicPartition, 
coordinatorEpoch: Int): ConcurrentMap[String, TransactionMetadata] =  {
@@ -672,7 +657,7 @@ class TransactionStateManager(brokerId: Int,
     val valueBytes = TransactionLog.valueToBytes(newMetadata, 
transactionVersionLevel())
     val timestamp = time.milliseconds()
 
-    val records = 
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, new 
SimpleRecord(timestamp, keyBytes, valueBytes))
+    val records = MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord(timestamp, keyBytes, valueBytes))
     val transactionStateTopicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, 
partitionFor(transactionalId))
     val transactionStateTopicIdPartition = 
replicaManager.topicIdPartition(transactionStateTopicPartition)
     val recordsPerPartition = Map(transactionStateTopicIdPartition -> records)
@@ -815,7 +800,7 @@ class TransactionStateManager(brokerId: Int,
           if (append) {
             replicaManager.appendRecords(
               timeout = newMetadata.txnTimeoutMs.toLong,
-              requiredAcks = TransactionLog.ENFORCED_REQUIRED_ACKS,
+              requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
               internalTopicsAllowed = true,
               origin = AppendOrigin.COORDINATOR,
               entriesPerPartition = recordsPerPartition,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index c339a1d595a..8467b285f33 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -419,7 +419,7 @@ class BrokerServer(
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
         config,
         () => groupCoordinator.groupMetadataTopicConfigs,
-        () => transactionCoordinator.transactionTopicConfigs,
+        () => transactionCoordinator.transactionStateTopicConfigs,
         () => shareCoordinator.shareGroupStateTopicConfigs,
         new KRaftTopicCreator(clientToControllerChannelManager),
         time,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a530454cb91..9d82174ab85 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1836,7 +1836,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (controlRecords.nonEmpty) {
           replicaManager.appendRecords(
             timeout = config.requestTimeoutMs.toLong,
-            requiredAcks = -1,
+            requiredAcks = TransactionCoordinator.EnforcedRequiredAcks,
             internalTopicsAllowed = true,
             origin = AppendOrigin.COORDINATOR,
             entriesPerPartition = controlRecords,
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 4343987f1a0..11714a66ab6 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -1182,7 +1182,7 @@ class TransactionStateManagerTest {
       val partitionId = transactionManager.partitionFor(transactionalId1)
       val topicPartition = new TopicIdPartition(transactionTopicId, 
partitionId, TRANSACTION_STATE_TOPIC_NAME)
       val expectedTombstone = new SimpleRecord(time.milliseconds(), 
TransactionLog.keyToBytes(transactionalId1), null)
-      val expectedRecords = 
MemoryRecords.withRecords(TransactionLog.ENFORCED_COMPRESSION, 
expectedTombstone)
+      val expectedRecords = MemoryRecords.withRecords(Compression.NONE, 
expectedTombstone)
       assertEquals(Set(topicPartition), appendedRecords.keySet)
       assertEquals(Seq(expectedRecords), appendedRecords(topicPartition).toSeq)
     } else {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 4239b49a962..4c36833ab73 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -766,7 +766,7 @@ class KafkaApisTest extends Logging {
         case CoordinatorType.TRANSACTION =>
           
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
           
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
-          when(txnCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+          when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new 
Properties)
           authorizeResource(authorizer, AclOperation.DESCRIBE, 
ResourceType.TRANSACTIONAL_ID,
             groupId, AuthorizationResult.ALLOWED)
           Topic.TRANSACTION_STATE_TOPIC_NAME
@@ -938,7 +938,7 @@ class KafkaApisTest extends Logging {
         case Topic.TRANSACTION_STATE_TOPIC_NAME =>
           
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
 numBrokersNeeded.toString)
           
topicConfigOverride.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 numBrokersNeeded.toString)
-          when(txnCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+          when(txnCoordinator.transactionStateTopicConfigs).thenReturn(new 
Properties)
           true
         case _ =>
           topicConfigOverride.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 
numBrokersNeeded.toString)
diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
index b63141ca5dc..fbf3a21304c 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.coordinator.transaction;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.record.internal.RecordBatch;
@@ -42,14 +41,6 @@ import java.util.stream.Collectors;
  */
 public class TransactionLog {
 
-    // enforce always using
-    //  1. cleanup policy = compact
-    //  2. compression = none
-    //  3. unclean leader election = disabled
-    //  4. required acks = -1 when writing
-    public static final Compression ENFORCED_COMPRESSION = Compression.NONE;
-    public static final short ENFORCED_REQUIRED_ACKS = (short) -1;
-
     /**
      * Generates the bytes for transaction log message key
      *
@@ -103,23 +94,6 @@ public class TransactionLog {
         return MessageUtil.toVersionPrefixedBytes(logValueVersion, value);
     }
 
-    /**
-     * Decodes the transaction log messages' key
-     *
-     * @return the transactional id
-     * @throws IllegalStateException if the version is not a valid transaction 
log key version
-     */
-    public static String readTxnRecordKey(ByteBuffer buffer) {
-        short version = buffer.getShort();
-        if (version == CoordinatorRecordType.TRANSACTION_LOG.id()) {
-            return new TransactionLogKey(new ByteBufferAccessor(buffer), 
(short) 0).transactionalId();
-        } else {
-            throw new IllegalStateException("Unknown version " + version + " 
from the transaction log message key");
-        }
-    }
-
-
-
     public sealed interface ReadResult permits TxnRecord, TxnTombstone, 
UnknownKeyVersion, UnknownValueVersion { }
 
     public record TxnRecord(String transactionId, TransactionMetadata 
metadata) implements ReadResult { }

Reply via email to