This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff1b9b64abf KAFKA-19915: [1/3] Move LogAppendResult to server module 
(#21410)
ff1b9b64abf is described below

commit ff1b9b64abf9f8bc4129e94088c3406d0cb90231
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Feb 18 14:31:38 2026 +0000

    KAFKA-19915: [1/3] Move LogAppendResult to server module (#21410)
    
    Move LogAppendResult as a record to Java
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../group/CoordinatorPartitionWriter.scala         |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   | 69 ++++++++-----------
 .../group/CoordinatorPartitionWriterTest.scala     | 40 +++++------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  9 +--
 .../org/apache/kafka/server/LogAppendResult.java   | 79 ++++++++++++++++++++++
 5 files changed, 133 insertions(+), 66 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index ec1569212e3..56a57c1af03 100644
--- 
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ 
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -163,7 +163,7 @@ class CoordinatorPartitionWriter(
     }
 
     // Required offset.
-    partitionResult.info.lastOffset + 1
+    partitionResult.logAppendSummary.lastOffset + 1
   }
 
   override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
CompletableFuture[Void] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 67dc3dc9463..d2471b2ecd2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, 
MetadataImage, TopicsDelta}
 import org.apache.kafka.logger.StateChangeLogger
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
 import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, 
StopPartition, TransactionVersion}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.config.ReplicationConfigs
@@ -63,7 +64,7 @@ import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
 import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
HostedPartition, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, 
HostedPartition, LogAppendResult, common}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, 
OffsetResultHolder, RecordValidationException, RecordValidationStats, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -80,25 +81,6 @@ import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOptional
 
-/*
- * Result metadata of a log append operation on the log
- */
-case class LogAppendResult(info: LogAppendInfo,
-                           exception: Option[Throwable],
-                           hasCustomErrorMessage: Boolean) {
-  def error: Errors = exception match {
-    case None => Errors.NONE
-    case Some(e) => Errors.forException(e)
-  }
-
-  def errorMessage: String = {
-    exception match {
-      case Some(e) if hasCustomErrorMessage => e.getMessage
-      case _ => null
-    }
-  }
-}
-
 case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, 
exception: Option[Throwable] = None) {
   def error: Errors = exception match {
     case None => Errors.NONE
@@ -681,7 +663,7 @@ class ReplicaManager(val config: KafkaConfig,
     val produceStatus = buildProducePartitionStatus(localProduceResults)
 
     recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
-      k -> v.info.recordValidationStats
+      k -> v.logAppendSummary().recordValidationStats()
     })
 
     maybeAddDelayedProduce(
@@ -763,10 +745,10 @@ class ReplicaManager(val config: KafkaConfig,
                 }
               case _ => None
             }
-          new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), 
Uuid.ZERO_UUID), topicPartition) -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(customException.getOrElse(error.exception)),
-            hasCustomErrorMessage = customException.isDefined
+          new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), 
Uuid.ZERO_UUID), topicPartition) -> new LogAppendResult(
+            
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+            Optional.ofNullable(customException.getOrElse(error.exception)),
+            customException.isDefined
           )
       }
       // In non-transaction paths, errorResults is typically empty, so we can
@@ -854,13 +836,13 @@ class ReplicaManager(val config: KafkaConfig,
   ): Map[TopicIdPartition, ProducePartitionStatus] = {
     results.map { case (topicIdPartition, result) =>
       topicIdPartition -> ProducePartitionStatus(
-        result.info.lastOffset + 1, // required offset
+        result.logAppendSummary.lastOffset + 1, // required offset
         new PartitionResponse(
           result.error,
-          result.info.firstOffset,
-          result.info.logAppendTime,
-          result.info.logStartOffset,
-          result.info.recordErrors,
+          result.logAppendSummary.firstOffset,
+          result.logAppendSummary.logAppendTime,
+          result.logAppendSummary.logStartOffset,
+          result.logAppendSummary.recordErrors,
           result.errorMessage
         )
       )
@@ -874,7 +856,7 @@ class ReplicaManager(val config: KafkaConfig,
     actionQueue.add {
       () => appendResults.foreach { case (topicIdPartition, result) =>
         val requestKey = new 
TopicPartitionOperationKey(topicIdPartition.topicPartition)
-        result.info.leaderHwChange match {
+        result.logAppendSummary.leaderHwChange match {
           case LeaderHwChange.INCREASED =>
             // some delayed operations may be unblocked after HW changed
             delayedProducePurgatory.checkAndComplete(requestKey)
@@ -1356,7 +1338,7 @@ class ReplicaManager(val config: KafkaConfig,
                                             localProduceResults: 
Map[TopicIdPartition, LogAppendResult]): Boolean = {
     requiredAcks == -1 &&
     entriesPerPartition.nonEmpty &&
-    localProduceResults.values.count(_.exception.isDefined) < 
entriesPerPartition.size
+    localProduceResults.values.count(_.exception().isPresent) < 
entriesPerPartition.size
   }
 
   private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
@@ -1398,10 +1380,10 @@ class ReplicaManager(val config: KafkaConfig,
 
       // reject appending to internal topics if it is not allowed
       if (Topic.isInternal(topicIdPartition.topic) && !internalTopicsAllowed) {
-        (topicIdPartition, LogAppendResult(
-          LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-          Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicIdPartition.topic}")),
-          hasCustomErrorMessage = false))
+        (topicIdPartition, new LogAppendResult(
+          
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+          Optional.of(new InvalidTopicException(s"Cannot append to internal 
topic ${topicIdPartition.topic}")),
+          false))
       } else {
         try {
           val partition = getPartitionOrException(topicIdPartition)
@@ -1419,7 +1401,7 @@ class ReplicaManager(val config: KafkaConfig,
             trace(s"${records.sizeInBytes} written to log $topicIdPartition 
beginning at offset " +
               s"${info.firstOffset} and ending at offset ${info.lastOffset}")
 
-          (topicIdPartition, LogAppendResult(info, exception = None, 
hasCustomErrorMessage = false))
+          (topicIdPartition,  new 
LogAppendResult(LogAppendSummary.fromAppendInfo(info), Optional.empty(), false))
 
         } catch {
           // NOTE: Failed produce requests metric is not incremented for known 
exceptions
@@ -1431,16 +1413,19 @@ class ReplicaManager(val config: KafkaConfig,
                    _: CorruptRecordException |
                    _: KafkaStorageException |
                    _: UnknownTopicIdException) =>
-            (topicIdPartition, 
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e), 
hasCustomErrorMessage = false))
+            (topicIdPartition, new 
LogAppendResult(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+              Optional.of(e), false))
           case rve: RecordValidationException =>
             val logStartOffset = processFailedRecord(topicIdPartition, 
rve.invalidException)
             val recordErrors = rve.recordErrors
-            (topicIdPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
 recordErrors),
-              Some(rve.invalidException), hasCustomErrorMessage = true))
+            (topicIdPartition, new LogAppendResult(
+              
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
 recordErrors)),
+              Optional.of(rve.invalidException), true))
           case t: Throwable =>
             val logStartOffset = processFailedRecord(topicIdPartition, t)
-            (topicIdPartition, 
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
-              Some(t), hasCustomErrorMessage = false))
+            (topicIdPartition, new LogAppendResult(
+              
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset)),
+              Optional.of(t), false))
         }
       }
     }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 1083d6185e3..6872eafe370 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.coordinator.group
 
-import kafka.server.{LogAppendResult, ReplicaManager}
+import kafka.server.ReplicaManager
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
@@ -24,6 +24,8 @@ import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPa
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.internal.{CompressionType, 
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, 
SimpleRecord}
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter
+import org.apache.kafka.server.LogAppendResult
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
 import org.apache.kafka.server.common.TransactionVersion
 import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, 
LogConfig, RecordValidationStats, VerificationGuard}
 import org.apache.kafka.test.TestUtils.assertFutureThrows
@@ -107,8 +109,8 @@ class CoordinatorPartitionWriterTest {
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
       ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
-    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
-      new LogAppendInfo(
+    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+      LogAppendSummary.fromAppendInfo(new LogAppendInfo(
         5L,
         10L,
         Optional.empty,
@@ -119,9 +121,9 @@ class CoordinatorPartitionWriterTest {
         CompressionType.NONE,
         100,
         10L
-      ),
-      Option.empty,
-      hasCustomErrorMessage = false
+      )),
+      Optional.empty(),
+      false
     )))
 
     // Test non-transactional records (regular coordinator records) - should 
use TV_UNKNOWN
@@ -169,8 +171,8 @@ class CoordinatorPartitionWriterTest {
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
       ArgumentMatchers.eq(TransactionVersion.TV_2.featureLevel())
-    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
-      new LogAppendInfo(
+    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+      LogAppendSummary.fromAppendInfo(new LogAppendInfo(
         5L,
         10L,
         Optional.empty,
@@ -181,9 +183,9 @@ class CoordinatorPartitionWriterTest {
         CompressionType.NONE,
         100,
         10L
-      ),
-      Option.empty,
-      hasCustomErrorMessage = false
+      )),
+      Optional.empty(),
+      false
     )))
 
     // Test transactional records (transaction marker) - should use explicit 
transaction version
@@ -329,10 +331,10 @@ class CoordinatorPartitionWriterTest {
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
       ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
-    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
-      LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-      Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
-      hasCustomErrorMessage = false
+    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+      LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+      Optional.of(Errors.NOT_LEADER_OR_FOLLOWER.exception),
+      false
     )))
 
     val batch = MemoryRecords.withRecords(
@@ -374,10 +376,10 @@ class CoordinatorPartitionWriterTest {
       ArgumentMatchers.any(),
       ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
       ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
-    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
-      LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-      Some(Errors.NOT_LEADER_OR_FOLLOWER.exception(customMessage)),
-      hasCustomErrorMessage = true
+    )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+      LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+      Optional.of(Errors.NOT_LEADER_OR_FOLLOWER.exception(customMessage)),
+      true
     )))
 
     val batch = MemoryRecords.withRecords(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d504432a7a5..580af684cd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, 
PartitionRegistration}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.raft.KRaftConfigs
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
 import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition, 
TransactionVersion}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -6036,13 +6037,13 @@ class ReplicaManagerTest {
 
       val fooResult = result(foo)
       assertEquals(Errors.NONE, fooResult.error)
-      assertEquals(0, fooResult.info.logStartOffset)
-      assertEquals(0, fooResult.info.firstOffset)
-      assertEquals(0, fooResult.info.lastOffset)
+      assertEquals(0, fooResult.logAppendSummary.logStartOffset)
+      assertEquals(0, fooResult.logAppendSummary.firstOffset)
+      assertEquals(0, fooResult.logAppendSummary.lastOffset)
 
       val barResult = result(bar)
       assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
-      assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
+      
assertEquals(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
 barResult.logAppendSummary)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
diff --git a/server/src/main/java/org/apache/kafka/server/LogAppendResult.java 
b/server/src/main/java/org/apache/kafka/server/LogAppendResult.java
new file mode 100644
index 00000000000..78bcc13788d
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/LogAppendResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.storage.internals.log.LeaderHwChange;
+import org.apache.kafka.storage.internals.log.LogAppendInfo;
+import org.apache.kafka.storage.internals.log.RecordValidationStats;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Result metadata of a log append operation on the log
+ */
+public record LogAppendResult(
+    LogAppendSummary logAppendSummary,
+    Optional<Throwable> exception,
+    boolean hasCustomErrorMessage
+) {
+
+    public record LogAppendSummary(
+        long firstOffset,
+        long lastOffset,
+        long logAppendTime,
+        long logStartOffset,
+        RecordValidationStats recordValidationStats,
+        List<ProduceResponse.RecordError> recordErrors,
+        LeaderHwChange leaderHwChange
+    ) {
+        public LogAppendSummary {
+            recordErrors = List.copyOf(recordErrors);
+            if (recordValidationStats == null) {
+                recordValidationStats = RecordValidationStats.EMPTY;
+            }
+        }
+
+        public static LogAppendSummary fromAppendInfo(LogAppendInfo info) {
+            return new LogAppendSummary(
+                info.firstOffset(),
+                info.lastOffset(),
+                info.logAppendTime(),
+                info.logStartOffset(),
+                info.recordValidationStats(),
+                info.recordErrors(),
+                info.leaderHwChange()
+            );
+        }
+    }
+
+    public Errors error() {
+        return exception
+                .map(Errors::forException)
+                .orElse(Errors.NONE);
+    }
+
+    public String errorMessage() {
+        return exception
+                .filter(e -> hasCustomErrorMessage)
+                .map(Throwable::getMessage)
+                .orElse(null);
+    }
+}

Reply via email to