This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff1b9b64abf KAFKA-19915: [1/3] Move LogAppendResult to server module
(#21410)
ff1b9b64abf is described below
commit ff1b9b64abf9f8bc4129e94088c3406d0cb90231
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Feb 18 14:31:38 2026 +0000
KAFKA-19915: [1/3] Move LogAppendResult to server module (#21410)
Move LogAppendResult as a record to Java
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../group/CoordinatorPartitionWriter.scala | 2 +-
.../main/scala/kafka/server/ReplicaManager.scala | 69 ++++++++-----------
.../group/CoordinatorPartitionWriterTest.scala | 40 +++++------
.../unit/kafka/server/ReplicaManagerTest.scala | 9 +--
.../org/apache/kafka/server/LogAppendResult.java | 79 ++++++++++++++++++++++
5 files changed, 133 insertions(+), 66 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index ec1569212e3..56a57c1af03 100644
---
a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++
b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -163,7 +163,7 @@ class CoordinatorPartitionWriter(
}
// Required offset.
- partitionResult.info.lastOffset + 1
+ partitionResult.logAppendSummary.lastOffset + 1
}
override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long):
CompletableFuture[Void] = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 67dc3dc9463..d2471b2ecd2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.image.{LocalReplicaChanges,
MetadataImage, TopicsDelta}
import org.apache.kafka.logger.StateChangeLogger
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal,
StopPartition, TransactionVersion}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.config.ReplicationConfigs
@@ -63,7 +64,7 @@ import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import
org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
HostedPartition, common}
+import org.apache.kafka.server.{ActionQueue, DelayedActionQueue,
HostedPartition, LogAppendResult, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints,
OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult,
OffsetResultHolder, RecordValidationException, RecordValidationStats,
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -80,25 +81,6 @@ import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional
-/*
- * Result metadata of a log append operation on the log
- */
-case class LogAppendResult(info: LogAppendInfo,
- exception: Option[Throwable],
- hasCustomErrorMessage: Boolean) {
- def error: Errors = exception match {
- case None => Errors.NONE
- case Some(e) => Errors.forException(e)
- }
-
- def errorMessage: String = {
- exception match {
- case Some(e) if hasCustomErrorMessage => e.getMessage
- case _ => null
- }
- }
-}
-
case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long,
exception: Option[Throwable] = None) {
def error: Errors = exception match {
case None => Errors.NONE
@@ -681,7 +663,7 @@ class ReplicaManager(val config: KafkaConfig,
val produceStatus = buildProducePartitionStatus(localProduceResults)
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
- k -> v.info.recordValidationStats
+ k -> v.logAppendSummary().recordValidationStats()
})
maybeAddDelayedProduce(
@@ -763,10 +745,10 @@ class ReplicaManager(val config: KafkaConfig,
}
case _ => None
}
- new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(),
Uuid.ZERO_UUID), topicPartition) -> LogAppendResult(
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(customException.getOrElse(error.exception)),
- hasCustomErrorMessage = customException.isDefined
+ new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(),
Uuid.ZERO_UUID), topicPartition) -> new LogAppendResult(
+
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+ Optional.ofNullable(customException.getOrElse(error.exception)),
+ customException.isDefined
)
}
// In non-transaction paths, errorResults is typically empty, so we can
@@ -854,13 +836,13 @@ class ReplicaManager(val config: KafkaConfig,
): Map[TopicIdPartition, ProducePartitionStatus] = {
results.map { case (topicIdPartition, result) =>
topicIdPartition -> ProducePartitionStatus(
- result.info.lastOffset + 1, // required offset
+ result.logAppendSummary.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
- result.info.firstOffset,
- result.info.logAppendTime,
- result.info.logStartOffset,
- result.info.recordErrors,
+ result.logAppendSummary.firstOffset,
+ result.logAppendSummary.logAppendTime,
+ result.logAppendSummary.logStartOffset,
+ result.logAppendSummary.recordErrors,
result.errorMessage
)
)
@@ -874,7 +856,7 @@ class ReplicaManager(val config: KafkaConfig,
actionQueue.add {
() => appendResults.foreach { case (topicIdPartition, result) =>
val requestKey = new
TopicPartitionOperationKey(topicIdPartition.topicPartition)
- result.info.leaderHwChange match {
+ result.logAppendSummary.leaderHwChange match {
case LeaderHwChange.INCREASED =>
// some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey)
@@ -1356,7 +1338,7 @@ class ReplicaManager(val config: KafkaConfig,
localProduceResults:
Map[TopicIdPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
- localProduceResults.values.count(_.exception.isDefined) <
entriesPerPartition.size
+ localProduceResults.values.count(_.exception().isPresent) <
entriesPerPartition.size
}
private def isValidRequiredAcks(requiredAcks: Short): Boolean = {
@@ -1398,10 +1380,10 @@ class ReplicaManager(val config: KafkaConfig,
// reject appending to internal topics if it is not allowed
if (Topic.isInternal(topicIdPartition.topic) && !internalTopicsAllowed) {
- (topicIdPartition, LogAppendResult(
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(new InvalidTopicException(s"Cannot append to internal topic
${topicIdPartition.topic}")),
- hasCustomErrorMessage = false))
+ (topicIdPartition, new LogAppendResult(
+
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+ Optional.of(new InvalidTopicException(s"Cannot append to internal
topic ${topicIdPartition.topic}")),
+ false))
} else {
try {
val partition = getPartitionOrException(topicIdPartition)
@@ -1419,7 +1401,7 @@ class ReplicaManager(val config: KafkaConfig,
trace(s"${records.sizeInBytes} written to log $topicIdPartition
beginning at offset " +
s"${info.firstOffset} and ending at offset ${info.lastOffset}")
- (topicIdPartition, LogAppendResult(info, exception = None,
hasCustomErrorMessage = false))
+ (topicIdPartition, new
LogAppendResult(LogAppendSummary.fromAppendInfo(info), Optional.empty(), false))
} catch {
// NOTE: Failed produce requests metric is not incremented for known
exceptions
@@ -1431,16 +1413,19 @@ class ReplicaManager(val config: KafkaConfig,
_: CorruptRecordException |
_: KafkaStorageException |
_: UnknownTopicIdException) =>
- (topicIdPartition,
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e),
hasCustomErrorMessage = false))
+ (topicIdPartition, new
LogAppendResult(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+ Optional.of(e), false))
case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicIdPartition,
rve.invalidException)
val recordErrors = rve.recordErrors
- (topicIdPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
recordErrors),
- Some(rve.invalidException), hasCustomErrorMessage = true))
+ (topicIdPartition, new LogAppendResult(
+
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
recordErrors)),
+ Optional.of(rve.invalidException), true))
case t: Throwable =>
val logStartOffset = processFailedRecord(topicIdPartition, t)
- (topicIdPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
- Some(t), hasCustomErrorMessage = false))
+ (topicIdPartition, new LogAppendResult(
+
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset)),
+ Optional.of(t), false))
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 1083d6185e3..6872eafe370 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.coordinator.group
-import kafka.server.{LogAppendResult, ReplicaManager}
+import kafka.server.ReplicaManager
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
@@ -24,6 +24,8 @@ import
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPa
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.internal.{CompressionType,
ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch,
SimpleRecord}
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
+import org.apache.kafka.server.LogAppendResult
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
import org.apache.kafka.server.common.TransactionVersion
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo,
LogConfig, RecordValidationStats, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
@@ -107,8 +109,8 @@ class CoordinatorPartitionWriterTest {
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
- )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
- new LogAppendInfo(
+ )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+ LogAppendSummary.fromAppendInfo(new LogAppendInfo(
5L,
10L,
Optional.empty,
@@ -119,9 +121,9 @@ class CoordinatorPartitionWriterTest {
CompressionType.NONE,
100,
10L
- ),
- Option.empty,
- hasCustomErrorMessage = false
+ )),
+ Optional.empty(),
+ false
)))
// Test non-transactional records (regular coordinator records) - should
use TV_UNKNOWN
@@ -169,8 +171,8 @@ class CoordinatorPartitionWriterTest {
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.eq(TransactionVersion.TV_2.featureLevel())
- )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
- new LogAppendInfo(
+ )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+ LogAppendSummary.fromAppendInfo(new LogAppendInfo(
5L,
10L,
Optional.empty,
@@ -181,9 +183,9 @@ class CoordinatorPartitionWriterTest {
CompressionType.NONE,
100,
10L
- ),
- Option.empty,
- hasCustomErrorMessage = false
+ )),
+ Optional.empty(),
+ false
)))
// Test transactional records (transaction marker) - should use explicit
transaction version
@@ -329,10 +331,10 @@ class CoordinatorPartitionWriterTest {
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
- )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
- hasCustomErrorMessage = false
+ )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+ LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+ Optional.of(Errors.NOT_LEADER_OR_FOLLOWER.exception),
+ false
)))
val batch = MemoryRecords.withRecords(
@@ -374,10 +376,10 @@ class CoordinatorPartitionWriterTest {
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
- )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> LogAppendResult(
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(Errors.NOT_LEADER_OR_FOLLOWER.exception(customMessage)),
- hasCustomErrorMessage = true
+ )).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
+ LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
+ Optional.of(Errors.NOT_LEADER_OR_FOLLOWER.exception(customMessage)),
+ true
)))
val batch = MemoryRecords.withRecords(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d504432a7a5..580af684cd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache,
PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.KRaftConfigs
+import org.apache.kafka.server.LogAppendResult.LogAppendSummary
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion,
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition,
TransactionVersion}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -6036,13 +6037,13 @@ class ReplicaManagerTest {
val fooResult = result(foo)
assertEquals(Errors.NONE, fooResult.error)
- assertEquals(0, fooResult.info.logStartOffset)
- assertEquals(0, fooResult.info.firstOffset)
- assertEquals(0, fooResult.info.lastOffset)
+ assertEquals(0, fooResult.logAppendSummary.logStartOffset)
+ assertEquals(0, fooResult.logAppendSummary.firstOffset)
+ assertEquals(0, fooResult.logAppendSummary.lastOffset)
val barResult = result(bar)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
- assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
+
assertEquals(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
barResult.logAppendSummary)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
diff --git a/server/src/main/java/org/apache/kafka/server/LogAppendResult.java
b/server/src/main/java/org/apache/kafka/server/LogAppendResult.java
new file mode 100644
index 00000000000..78bcc13788d
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/LogAppendResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.storage.internals.log.LeaderHwChange;
+import org.apache.kafka.storage.internals.log.LogAppendInfo;
+import org.apache.kafka.storage.internals.log.RecordValidationStats;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Result metadata of a log append operation on the log
+ */
+public record LogAppendResult(
+ LogAppendSummary logAppendSummary,
+ Optional<Throwable> exception,
+ boolean hasCustomErrorMessage
+) {
+
+ public record LogAppendSummary(
+ long firstOffset,
+ long lastOffset,
+ long logAppendTime,
+ long logStartOffset,
+ RecordValidationStats recordValidationStats,
+ List<ProduceResponse.RecordError> recordErrors,
+ LeaderHwChange leaderHwChange
+ ) {
+ public LogAppendSummary {
+ recordErrors = List.copyOf(recordErrors);
+ if (recordValidationStats == null) {
+ recordValidationStats = RecordValidationStats.EMPTY;
+ }
+ }
+
+ public static LogAppendSummary fromAppendInfo(LogAppendInfo info) {
+ return new LogAppendSummary(
+ info.firstOffset(),
+ info.lastOffset(),
+ info.logAppendTime(),
+ info.logStartOffset(),
+ info.recordValidationStats(),
+ info.recordErrors(),
+ info.leaderHwChange()
+ );
+ }
+ }
+
+ public Errors error() {
+ return exception
+ .map(Errors::forException)
+ .orElse(Errors.NONE);
+ }
+
+ public String errorMessage() {
+ return exception
+ .filter(e -> hasCustomErrorMessage)
+ .map(Throwable::getMessage)
+ .orElse(null);
+ }
+}