This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28de78bcbad MINOR: Refactor GroupCoordinator write path (#19290)
28de78bcbad is described below

commit 28de78bcbad605a3e906d085d2e59b441ae35212
Author: David Jacot <[email protected]>
AuthorDate: Thu Mar 27 16:58:47 2025 +0100

    MINOR: Refactor GroupCoordinator write path (#19290)
    
    This patch addresses a weirdness on the GroupCoordinator write path. The
    `CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords`
    method with `acks=1` and it expects it to completes
    immediately/synchronously. It works because this is effectively what the
    method does with `acks=1`. The issue is that fundamentally the method is
    asynchronous so the contract is really fragile. This patch changes it by
    introducing new method `ReplicaManager.appendRecordsToLeader`, which is
    synchronous. It also refactors `ReplicaManager#appendRecords` to use
    `ReplicaManager.appendRecordsToLeader` so we can benefits from all the
    existing tests.
    
    Reviewers: Fred Zheng <[email protected]>, Jeff Kim 
<[email protected]>
---
 .../group/CoordinatorPartitionWriter.scala         | 10 +---
 .../main/scala/kafka/server/ReplicaManager.scala   | 60 ++++++++++++++++---
 .../group/CoordinatorPartitionWriterTest.scala     | 67 +++++++++------------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 68 +++++++++++++++++++++-
 4 files changed, 150 insertions(+), 55 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index 08b3c9aa498..64f23c32f52 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -21,7 +21,6 @@ import kafka.server.{AddPartitionsToTxnManager, 
ReplicaManager}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
 import org.apache.kafka.server.ActionQueue
 import org.apache.kafka.server.common.RequestLocal
@@ -139,17 +138,14 @@ class CoordinatorPartitionWriter(
     verificationGuard: VerificationGuard,
     records: MemoryRecords
   ): Long = {
-    var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-    replicaManager.appendRecords(
-      timeout = 0L,
+    // We write synchronously to the leader replica without waiting on 
replication.
+    val appendResults = replicaManager.appendRecordsToLeader(
       requiredAcks = 1,
       internalTopicsAllowed = true,
       origin = AppendOrigin.COORDINATOR,
       entriesPerPartition = Map(tp -> records),
-      responseCallback = results => appendResults = results,
       requestLocal = RequestLocal.noCaching,
       verificationGuards = Map(tp -> verificationGuard),
-      delayedProduceLock = None,
       // We can directly complete the purgatories here because we don't hold
       // any conflicting locks.
       actionQueue = directActionQueue
@@ -163,7 +159,7 @@ class CoordinatorPartitionWriter(
     }
 
     // Required offset.
-    partitionResult.lastOffset + 1
+    partitionResult.info.lastOffset + 1
   }
 
   override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
CompletableFuture[Void] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b5ee5629e76..958e4ce9dba 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -623,6 +623,50 @@ class ReplicaManager(val config: KafkaConfig,
 
   def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
 
+  /**
+   * Append messages to leader replicas of the partition, without waiting on 
replication.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecordsToLeader()
+   * are expected to call ActionQueue.tryCompleteActions for all affected 
partitions, without holding any conflicting
+   * locks.
+   *
+   * @param requiredAcks                  the required acks -- it is only used 
to ensure that the append meets the
+   *                                      required acks.
+   * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
+   * @param origin                        source of the append request (ie, 
client, replication, coordinator)
+   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *                                      thread calling this method
+   * @param actionQueue                   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuards            the mapping from topic partition to 
verification guards if transaction verification is used
+   */
+  def appendRecordsToLeader(
+    requiredAcks: Short,
+    internalTopicsAllowed: Boolean,
+    origin: AppendOrigin,
+    entriesPerPartition: Map[TopicPartition, MemoryRecords],
+    requestLocal: RequestLocal = RequestLocal.noCaching,
+    actionQueue: ActionQueue = this.defaultActionQueue,
+    verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty
+  ): Map[TopicPartition, LogAppendResult] = {
+    val startTimeMs = time.milliseconds
+    val localProduceResultsWithTopicId = appendToLocalLog(
+      internalTopicsAllowed = internalTopicsAllowed,
+      origin,
+      entriesPerPartition,
+      requiredAcks,
+      requestLocal,
+      verificationGuards.toMap
+    )
+    debug("Produce to local log in %d ms".format(time.milliseconds - 
startTimeMs))
+
+    addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
+
+    localProduceResultsWithTopicId.map {
+      case (k, v) => (k.topicPartition, v)
+    }
+  }
+
   /**
    * Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
    * the callback function will be triggered either when timeout or the 
required acks are satisfied;
@@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
       return
     }
 
-    val sTime = time.milliseconds
-    val localProduceResultsWithTopicId = 
appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-      origin, entriesPerPartition, requiredAcks, requestLocal, 
verificationGuards.toMap)
-    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-    val localProduceResults : Map[TopicPartition, LogAppendResult] = 
localProduceResultsWithTopicId.map {
-      case(k, v) => (k.topicPartition, v)}
+    val localProduceResults = appendRecordsToLeader(
+      requiredAcks,
+      internalTopicsAllowed,
+      origin,
+      entriesPerPartition,
+      requestLocal,
+      actionQueue,
+      verificationGuards
+    )
 
     val produceStatus = buildProducePartitionStatus(localProduceResults)
 
-    addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
     recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
       k -> v.info.recordValidationStats
     })
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index eaa14832a8d..a56dab4fba7 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -16,16 +16,15 @@
  */
 package kafka.coordinator.group
 
-import kafka.server.ReplicaManager
+import kafka.server.{LogAppendResult, ReplicaManager}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
SimpleRecord}
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, RecordValidationStats, SimpleRecord}
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, 
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, 
LogConfig, VerificationGuard}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
@@ -35,7 +34,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.Mockito.{mock, verify, when}
 
 import java.nio.charset.Charset
-import java.util.Collections
+import java.util.Optional
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
@@ -94,34 +93,31 @@ class CoordinatorPartitionWriterTest {
 
     val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
-    val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] 
=> Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
 
-    when(replicaManager.appendRecords(
-      ArgumentMatchers.eq(0L),
+    when(replicaManager.appendRecordsToLeader(
       ArgumentMatchers.eq(1.toShort),
       ArgumentMatchers.eq(true),
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       recordsCapture.capture(),
-      callbackCapture.capture(),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any(),
       ArgumentMatchers.any(),
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
-    )).thenAnswer( _ => {
-      callbackCapture.getValue.apply(Map(
-        tp -> new PartitionResponse(
-          Errors.NONE,
-          5,
-          10,
-          RecordBatch.NO_TIMESTAMP,
-          -1,
-          Collections.emptyList(),
-          ""
-        )
-      ))
-    })
+    )).thenReturn(Map(tp -> LogAppendResult(
+      new LogAppendInfo(
+        5L,
+        10L,
+        Optional.empty,
+        RecordBatch.NO_TIMESTAMP,
+        0L,
+        0L,
+        RecordValidationStats.EMPTY,
+        CompressionType.NONE,
+        100,
+        10L
+      ),
+      Option.empty,
+      false
+    )))
 
     val batch = MemoryRecords.withRecords(
       Compression.NONE,
@@ -140,8 +136,7 @@ class CoordinatorPartitionWriterTest {
 
     assertEquals(
       batch,
-      recordsCapture.getValue.getOrElse(tp,
-        throw new AssertionError(s"No records for $tp"))
+      recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No 
records for $tp"))
     )
   }
 
@@ -203,26 +198,20 @@ class CoordinatorPartitionWriterTest {
 
     val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
       ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
-    val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] 
=> Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
 
-    when(replicaManager.appendRecords(
-      ArgumentMatchers.eq(0L),
+    when(replicaManager.appendRecordsToLeader(
       ArgumentMatchers.eq(1.toShort),
       ArgumentMatchers.eq(true),
       ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
       recordsCapture.capture(),
-      callbackCapture.capture(),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any(),
       ArgumentMatchers.any(),
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
-    )).thenAnswer(_ => {
-      callbackCapture.getValue.apply(Map(
-        tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)
-      ))
-    })
+    )).thenReturn(Map(tp -> LogAppendResult(
+      LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+      Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
+      false
+    )))
 
     val batch = MemoryRecords.withRecords(
       Compression.NONE,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5f1fb7b5f24..fda7eceab7f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
MetadataVersion, OffsetAndEpoch, StopPartition}
+import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage._
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -72,7 +72,7 @@ import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, 
ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog, 
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, 
UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -6134,6 +6134,70 @@ class ReplicaManagerTest {
 
   }
 
+  @Test
+  def testAppendRecordsToLeader(): Unit = {
+    val localId = 0
+    val foo = new TopicIdPartition(Uuid.randomUuid, 0, "foo")
+    val bar = new TopicIdPartition(Uuid.randomUuid, 0, "bar")
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId
+    )
+
+    try {
+      val topicDelta = new TopicsDelta(TopicsImage.EMPTY)
+      topicDelta.replay(new TopicRecord()
+        .setName(foo.topic)
+        .setTopicId(foo.topicId)
+      )
+      topicDelta.replay(new PartitionRecord()
+        .setTopicId(foo.topicId)
+        .setPartitionId(foo.partition)
+        .setLeader(localId)
+        .setLeaderEpoch(0)
+        .setPartitionEpoch(0)
+        .setReplicas(List[Integer](localId).asJava)
+        .setIsr(List[Integer](localId).asJava)
+      )
+
+      val metadataImage = imageFromTopics(topicDelta.apply())
+      replicaManager.applyDelta(topicDelta, metadataImage)
+
+      // Create test records.
+      val records = TestUtils.singletonRecords(
+        value = "test".getBytes,
+        timestamp = time.milliseconds
+      )
+
+      // Append records to both foo and bar.
+      val result = replicaManager.appendRecordsToLeader(
+        requiredAcks = 1,
+        internalTopicsAllowed = true,
+        origin = AppendOrigin.CLIENT,
+        entriesPerPartition = Map(
+          foo.topicPartition -> records,
+          bar.topicPartition -> records
+        ),
+        requestLocal = RequestLocal.noCaching
+      )
+
+      assertNotNull(result)
+      assertEquals(2, result.size)
+
+      val fooResult = result(foo.topicPartition)
+      assertEquals(Errors.NONE, fooResult.error)
+      assertEquals(0, fooResult.info.logStartOffset)
+      assertEquals(0, fooResult.info.firstOffset)
+      assertEquals(0, fooResult.info.lastOffset)
+
+      val barResult = result(bar.topicPartition)
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
+      assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
 
   @Test
   def testMonitorableReplicaSelector(): Unit = {

Reply via email to