This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c925e9f334 KAFKA-15526: Simplify the LogAppendInfo class (#14470)
2c925e9f334 is described below
commit 2c925e9f334b0aa6c52df343c2394f00f86ac2ff
Author: David Mao <[email protected]>
AuthorDate: Tue Oct 3 17:32:44 2023 -0700
KAFKA-15526: Simplify the LogAppendInfo class (#14470)
The LogAppendInfo class is a bit bloated in terms of class fields. That's
because it is used as an umbrella class for both leader log appends and
follower log appends and needs to carry fields for both. This makes the
constructor for the class a bit cludgy to use. It also ends up being a bit
confusing when fields are important and when they aren't. I noticed there were
a few fields that didn't seem necessary.
Below is a description of changes:
firstOffset is a LogOffsetMetadata but there are no readers of the field
that use anything but the messageOffset field - simplified to a long.
LogAppendInfo.errorMessage is only set in one context - when calling
LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this
constructor, we pass up the original exception in LogAppendResult anyway, so
the field is redundant with the LogAppendResult.exception field. This allows us
to simplify the handling in KAFKA-15459: Convert coordinator retriable errors
to a known producer response error #14378 since there are no custom error
messages we just return whatever is in t [...]
We only use targetCompressionType when constructing the LogValidator - just
inline the call instead of including it in the LogAppendInfo.
offsetsMonotonic is only used when not assigning offsets to throw an
exception - just throw the exception instead of setting a field to throw later.
shallowCount is only there to determine whether there are any messages in
the append. Instead, we can just check validBytes which is incremented with a
non-zero value every time we increment shallowCount.
Reviewers: Justine Olshan <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 55 ++++++-------
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 6 +-
.../main/scala/kafka/server/ReplicaManager.scala | 92 +++++++++++-----------
.../src/test/scala/other/kafka/StressTestLog.scala | 2 +-
.../log/AbstractLogCleanerIntegrationTest.scala | 2 +-
.../LogCleanerParameterizedIntegrationTest.scala | 6 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 10 +--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 34 ++++----
.../unit/kafka/server/MockFetcherThread.scala | 9 +--
.../kafka/server/ReplicaFetcherThreadTest.scala | 9 +--
.../kafka/storage/internals/log/LogAppendInfo.java | 90 +++++----------------
12 files changed, 127 insertions(+), 190 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 6c680135ae1..601d444d0e6 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -768,10 +768,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// This will ensure that any log data can be recovered with the correct
topic ID in the case of failure.
maybeFlushMetadataFile()
- val appendInfo = analyzeAndValidateRecords(records, origin,
ignoreRecordSize, leaderEpoch)
+ val appendInfo = analyzeAndValidateRecords(records, origin,
ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the
last appended entry
- if (appendInfo.shallowCount == 0) appendInfo
+ if (appendInfo.validBytes <= 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the
on-disk log
@@ -784,13 +784,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
- appendInfo.setFirstOffset(Optional.of(new
LogOffsetMetadata(offset.value)))
+ appendInfo.setFirstOffset(offset.value)
val validateAndOffsetAssignResult = try {
+ val targetCompression =
BrokerCompressionType.forName(config.compressionType).targetCompressionType(appendInfo.sourceCompression)
val validator = new LogValidator(validRecords,
topicPartition,
time,
appendInfo.sourceCompression,
- appendInfo.targetCompression,
+ targetCompression,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
@@ -835,18 +836,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
} else {
// we are taking the offsets we are given
- if (!appendInfo.offsetsMonotonic)
- throw new OffsetsOutOfOrderException(s"Out of order offsets
found in append to $topicPartition: " +
- records.records.asScala.map(_.offset))
-
if (appendInfo.firstOrLastOffsetOfFirstBatch <
localLog.logEndOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader
which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
- val firstOffset = appendInfo.firstOffset.map[Long](x =>
x.messageOffset)
- .orElse(records.batches.iterator().next().baseOffset())
+ val hasFirstOffset = appendInfo.firstOffset !=
UnifiedLog.UnknownOffset
+ val firstOffset = if (hasFirstOffset) appendInfo.firstOffset
else records.batches.iterator().next().baseOffset()
- val firstOrLast = if (appendInfo.firstOffset.isPresent) "First
offset" else "Last offset of the first batch"
+ val firstOrLast = if (hasFirstOffset) "First offset" else "Last
offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast
" +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than
the next offset ${localLog.logEndOffset}. " +
@@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
maybeDuplicate match {
case Some(duplicate) =>
- appendInfo.setFirstOffset(Optional.of(new
LogOffsetMetadata(duplicate.firstOffset)))
+ appendInfo.setFirstOffset(duplicate.firstOffset)
appendInfo.setLastOffset(duplicate.lastOffset)
appendInfo.setLogAppendTime(duplicate.timestamp)
appendInfo.setLogStartOffset(logStartOffset)
case None =>
- // Before appending update the first offset metadata to include
segment information
- appendInfo.setFirstOffset(appendInfo.firstOffset.map {
offsetMetadata =>
- new LogOffsetMetadata(offsetMetadata.messageOffset,
segment.baseOffset, segment.size)
- })
-
// Append the records, and increment the local log end offset
immediately after the append because a
// write to the transaction index below may fail and we want to
ensure that the offsets
// of future appends still grow monotonically. The resulting
transaction index inconsistency
@@ -1097,7 +1089,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* <ol>
* <li> each message matches its CRC
* <li> each message size is valid (if ignoreRecordSize is false)
- * <li> that the sequence numbers of the incoming record batches are
consistent with the existing state and with each other.
+ * <li> that the sequence numbers of the incoming record batches are
consistent with the existing state and with each other
+ * <li> that the offsets are monotonically increasing (if
requireOffsetsMonotonic is true)
* </ol>
*
* Also compute the following quantities:
@@ -1113,10 +1106,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def analyzeAndValidateRecords(records: MemoryRecords,
origin: AppendOrigin,
ignoreRecordSize: Boolean,
+ requireOffsetsMonotonic: Boolean,
leaderEpoch: Int): LogAppendInfo = {
- var shallowMessageCount = 0
var validBytesCount = 0
- var firstOffset: Optional[LogOffsetMetadata] = Optional.empty()
+ var firstOffset = UnifiedLog.UnknownOffset
var lastOffset = -1L
var lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH
var sourceCompression = CompressionType.NONE
@@ -1143,7 +1136,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// Also indicate whether we have the accurate first offset or not
if (!readFirstMessage) {
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
- firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset))
+ firstOffset = batch.baseOffset
lastOffsetOfFirstBatch = batch.lastOffset
readFirstMessage = true
}
@@ -1176,7 +1169,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
offsetOfMaxTimestamp = lastOffset
}
- shallowMessageCount += 1
validBytesCount += batchSize
val batchCompression = CompressionType.forId(batch.compressionType.id)
@@ -1184,17 +1176,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
sourceCompression = batchCompression
}
- // Apply broker-side compression if any
- val targetCompression =
BrokerCompressionType.forName(config.compressionType).targetCompressionType(sourceCompression)
+ if (requireOffsetsMonotonic && !monotonic)
+ throw new OffsetsOutOfOrderException(s"Out of order offsets found in
append to $topicPartition: " +
+ records.records.asScala.map(_.offset))
+
val lastLeaderEpochOpt: OptionalInt = if (lastLeaderEpoch !=
RecordBatch.NO_PARTITION_LEADER_EPOCH)
OptionalInt.of(lastLeaderEpoch)
else
OptionalInt.empty()
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt,
maxTimestamp, offsetOfMaxTimestamp,
- RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY,
sourceCompression, targetCompression,
- shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch,
Collections.emptyList[RecordError], null,
- LeaderHwChange.NONE)
+ RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY,
sourceCompression,
+ validBytesCount, lastOffsetOfFirstBatch,
Collections.emptyList[RecordError], LeaderHwChange.NONE)
}
/**
@@ -1588,10 +1581,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
Note that this is only required for pre-V2 message formats because
these do not store the first message offset
in the header.
*/
- val rollOffset = appendInfo
- .firstOffset
- .map[Long](_.messageOffset)
- .orElse(maxOffsetInMessages - Integer.MAX_VALUE)
+ val rollOffset = if (appendInfo.firstOffset == UnifiedLog.UnknownOffset)
+ maxOffsetInMessages - Integer.MAX_VALUE
+ else
+ appendInfo.firstOffset
roll(Some(rollOffset))
} else {
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 9703fbc444c..e41b6500665 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -96,10 +96,10 @@ final class KafkaMetadataLog private (
}
private def handleAndConvertLogAppendInfo(appendInfo:
internals.log.LogAppendInfo): LogAppendInfo = {
- if (appendInfo.firstOffset.isPresent())
- new LogAppendInfo(appendInfo.firstOffset.get().messageOffset,
appendInfo.lastOffset)
+ if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
+ new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
else
- throw new KafkaException(s"Append failed unexpectedly:
${appendInfo.errorMessage}")
+ throw new KafkaException(s"Append failed unexpectedly")
}
override def lastFetchedEpoch: Int = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 07e7418776a..eb140eb31fd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._
/*
* Result metadata of a log append operation on the log
*/
-case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] =
None) {
+case class LogAppendResult(info: LogAppendInfo,
+ exception: Option[Throwable],
+ hasCustomErrorMessage: Boolean) {
def error: Errors = exception match {
case None => Errors.NONE
case Some(e) => Errors.forException(e)
}
+
+ def errorMessage: String = {
+ exception match {
+ case Some(e) if hasCustomErrorMessage => e.getMessage
+ case _ => null
+ }
+ }
}
case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long,
exception: Option[Throwable] = None) {
@@ -739,67 +748,54 @@ class ReplicaManager(val config: KafkaConfig,
}
def appendEntries(allEntries: Map[TopicPartition,
MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
- val verifiedEntries =
+ val verifiedEntries =
if (unverifiedEntries.isEmpty)
- allEntries
+ allEntries
else
allEntries.filter { case (tp, _) =>
!unverifiedEntries.contains(tp)
}
-
+
val localProduceResults = appendToLocalLog(internalTopicsAllowed =
internalTopicsAllowed,
origin, verifiedEntries, requiredAcks, requestLocal,
verificationGuards.toMap)
debug("Produce to local log in %d ms".format(time.milliseconds -
sTime))
- def produceStatusResult(appendResult: Map[TopicPartition,
LogAppendResult],
- useCustomMessage: Boolean):
Map[TopicPartition, ProducePartitionStatus] = {
- appendResult.map { case (topicPartition, result) =>
- topicPartition -> ProducePartitionStatus(
- result.info.lastOffset + 1, // required offset
- new PartitionResponse(
- result.error,
- result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
- result.info.lastOffset,
- result.info.logAppendTime,
- result.info.logStartOffset,
- result.info.recordErrors,
- if (useCustomMessage) result.exception.get.getMessage else
result.info.errorMessage
- )
- ) // response status
- }
- }
-
- val unverifiedResults = unverifiedEntries.map {
+ val errorResults = (unverifiedEntries ++ errorsPerPartition).map {
case (topicPartition, error) =>
- val finalException =
+ // translate transaction coordinator errors to known producer
response errors
+ val customException =
error match {
- case Errors.INVALID_TXN_STATE => error.exception("Partition
was not added to the transaction")
+ case Errors.INVALID_TXN_STATE =>
Some(error.exception("Partition was not added to the transaction"))
case Errors.CONCURRENT_TRANSACTIONS |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
- Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
- s"Unable to verify the partition has been added to
the transaction. Underlying error: ${error.toString}")
- case _ => error.exception()
+ Errors.NOT_COORDINATOR => Some(new
NotEnoughReplicasException(
+ s"Unable to verify the partition has been added to
the transaction. Underlying error: ${error.toString}"))
+ case _ => None
}
topicPartition -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(finalException)
+ Some(customException.getOrElse(error.exception)),
+ hasCustomErrorMessage = customException.isDefined
)
}
- val errorResults = errorsPerPartition.map {
- case (topicPartition, error) =>
- topicPartition -> LogAppendResult(
- LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(error.exception())
+ val allResults = localProduceResults ++ errorResults
+ val produceStatus = allResults.map { case (topicPartition, result) =>
+ topicPartition -> ProducePartitionStatus(
+ result.info.lastOffset + 1, // required offset
+ new PartitionResponse(
+ result.error,
+ result.info.firstOffset,
+ result.info.lastOffset,
+ result.info.logAppendTime,
+ result.info.logStartOffset,
+ result.info.recordErrors,
+ result.errorMessage
)
+ ) // response status
}
- val produceStatus = Set((localProduceResults, false),
(unverifiedResults, true), (errorResults, false)).flatMap {
- case (results, useCustomError) => produceStatusResult(results,
useCustomError)
- }.toMap
- val allResults = localProduceResults ++ unverifiedResults ++
errorResults
-
actionQueue.add {
() => allResults.foreach { case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
@@ -859,7 +855,7 @@ class ReplicaManager(val config: KafkaConfig,
val responseStatus = entriesPerPartition.map { case (topicPartition, _)
=>
topicPartition -> new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
-
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
)
@@ -1182,7 +1178,8 @@ class ReplicaManager(val config: KafkaConfig,
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
- Some(new InvalidTopicException(s"Cannot append to internal topic
${topicPartition.topic}"))))
+ Some(new InvalidTopicException(s"Cannot append to internal topic
${topicPartition.topic}")),
+ hasCustomErrorMessage = false))
} else {
try {
val partition = getPartitionOrException(topicPartition)
@@ -1197,9 +1194,9 @@ class ReplicaManager(val config: KafkaConfig,
if (traceEnabled)
trace(s"${records.sizeInBytes} written to log $topicPartition
beginning at offset " +
- s"${info.firstOffset.orElse(new LogOffsetMetadata(-1))} and
ending at offset ${info.lastOffset}")
+ s"${info.firstOffset} and ending at offset ${info.lastOffset}")
- (topicPartition, LogAppendResult(info))
+ (topicPartition, LogAppendResult(info, exception = None,
hasCustomErrorMessage = false))
} catch {
// NOTE: Failed produce requests metric is not incremented for known
exceptions
// it is supposed to indicate un-expected failures of a broker in
handling a produce request
@@ -1209,15 +1206,16 @@ class ReplicaManager(val config: KafkaConfig,
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException) =>
- (topicPartition,
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e)))
+ (topicPartition,
LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Some(e),
hasCustomErrorMessage = false))
case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition,
rve.invalidException)
val recordErrors = rve.recordErrors
- (topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
- logStartOffset, recordErrors, rve.invalidException.getMessage),
Some(rve.invalidException)))
+ (topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset,
recordErrors),
+ Some(rve.invalidException), hasCustomErrorMessage = true))
case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t)
- (topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
Some(t)))
+ (topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
+ Some(t), hasCustomErrorMessage = false))
}
}
}
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala
b/core/src/test/scala/other/kafka/StressTestLog.scala
index 9dda3fde499..c67c543651f 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -124,7 +124,7 @@ object StressTestLog {
class WriterThread(val log: UnifiedLog) extends WorkerThread with
LogProgress {
override def work(): Unit = {
val logAppendInfo =
log.appendAsLeader(TestUtils.singletonRecords(currentOffset.toString.getBytes),
0)
- require((!logAppendInfo.firstOffset.isPresent ||
logAppendInfo.firstOffset.get().messageOffset == currentOffset)
+ require((logAppendInfo.firstOffset == -1 || logAppendInfo.firstOffset ==
currentOffset)
&& logAppendInfo.lastOffset == currentOffset)
currentOffset += 1
if (currentOffset % 1000 == 0)
diff --git
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index b52f4ec1e69..347cb5e6ecb 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -150,7 +150,7 @@ abstract class AbstractLogCleanerIntegrationTest {
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
incCounter()
- (key, value, appendInfo.firstOffset.get.messageOffset)
+ (key, value, appendInfo.firstOffset)
}
}
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 132a77ff97b..6946d8199a5 100755
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -70,7 +70,7 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
- val largeMessageOffset = appendInfo.firstOffset.get.messageOffset
+ val largeMessageOffset = appendInfo.firstOffset
val dups = writeDups(startKey = largeMessageKey + 1, numKeys = 100,
numDups = 3, log = log, codec = codec)
val appends2 = appends ++ Seq((largeMessageKey, largeMessageValue,
largeMessageOffset)) ++ dups
@@ -172,7 +172,7 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(largeMessageSet, leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
- val largeMessageOffset =
appendInfo.firstOffset.map[Long](_.messageOffset).get
+ val largeMessageOffset = appendInfo.firstOffset
// also add some messages with version 1 and version 2 to check that we
handle mixed format versions correctly
props.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG,
IBP_0_11_0_IV0.version)
@@ -320,7 +320,7 @@ class LogCleanerParameterizedIntegrationTest extends
AbstractLogCleanerIntegrati
val appendInfo = log.appendAsLeader(MemoryRecords.withRecords(magicValue,
codec, records: _*), leaderEpoch = 0)
// move LSO forward to increase compaction bound
log.updateHighWatermark(log.logEndOffset)
- val offsets = appendInfo.firstOffset.get.messageOffset to
appendInfo.lastOffset
+ val offsets = appendInfo.firstOffset to appendInfo.lastOffset
kvs.zip(offsets).map { case (kv, offset) => (kv._1, kv._2, offset) }
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 05aaa8ab062..7a221ad3310 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -295,17 +295,17 @@ class LogCleanerTest {
// check duplicate append from producer 1
var logAppendInfo = appendIdempotentAsLeader(log, pid1,
producerEpoch)(Seq(1, 2, 3))
- assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
+ assertEquals(0L, logAppendInfo.firstOffset)
assertEquals(2L, logAppendInfo.lastOffset)
// check duplicate append from producer 3
logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1,
4))
- assertEquals(6L, logAppendInfo.firstOffset.get.messageOffset)
+ assertEquals(6L, logAppendInfo.firstOffset)
assertEquals(7L, logAppendInfo.lastOffset)
// check duplicate append from producer 2
logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3,
1, 4))
- assertEquals(3L, logAppendInfo.firstOffset.get.messageOffset)
+ assertEquals(3L, logAppendInfo.firstOffset)
assertEquals(5L, logAppendInfo.lastOffset)
// do one more append and a round of cleaning to force another deletion
from producer 1's batch
@@ -321,7 +321,7 @@ class LogCleanerTest {
// duplicate append from producer1 should still be fine
logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1,
2, 3))
- assertEquals(0L, logAppendInfo.firstOffset.get.messageOffset)
+ assertEquals(0L, logAppendInfo.firstOffset)
assertEquals(2L, logAppendInfo.lastOffset)
}
@@ -2048,7 +2048,7 @@ class LogCleanerTest {
private def writeToLog(log: UnifiedLog, seq: Iterable[(Int, Int)]):
Iterable[Long] = {
for ((key, value) <- seq)
- yield log.appendAsLeader(record(key, value), leaderEpoch =
0).firstOffset.get.messageOffset
+ yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
}
private def key(id: Long) = ByteBuffer.wrap(id.toString.getBytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ee70a518910..b4ea0f95919 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -320,7 +320,7 @@ class LogManagerTest {
for (_ <- 0 until numMessages) {
val set = TestUtils.singletonRecords("test".getBytes())
val info = log.appendAsLeader(set, leaderEpoch = 0)
- offset = info.firstOffset.get.messageOffset
+ offset = info.firstOffset
}
log.updateHighWatermark(log.logEndOffset)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index f467345f69f..16d8c1e7adb 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -160,17 +160,17 @@ class UnifiedLogTest {
val records = TestUtils.records(simpleRecords)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
- assertEquals(new LogOffsetMetadata(0, 0, 0),
firstAppendInfo.firstOffset.get)
+ assertEquals(0, firstAppendInfo.firstOffset)
val secondAppendInfo = log.appendAsLeader(
TestUtils.records(simpleRecords),
leaderEpoch = 0
)
- assertEquals(new LogOffsetMetadata(simpleRecords.size, 0,
records.sizeInBytes), secondAppendInfo.firstOffset.get)
+ assertEquals(simpleRecords.size, secondAppendInfo.firstOffset)
log.roll()
val afterRollAppendInfo =
log.appendAsLeader(TestUtils.records(simpleRecords), leaderEpoch = 0)
- assertEquals(new LogOffsetMetadata(simpleRecords.size * 2,
simpleRecords.size * 2, 0), afterRollAppendInfo.firstOffset.get)
+ assertEquals(simpleRecords.size * 2, afterRollAppendInfo.firstOffset)
}
@Test
@@ -1260,7 +1260,7 @@ class UnifiedLogTest {
), producerId = pid, producerEpoch = epoch, sequence = seq)
val multiEntryAppendInfo = log.appendAsLeader(createRecords, leaderEpoch =
0)
assertEquals(
- multiEntryAppendInfo.lastOffset -
multiEntryAppendInfo.firstOffset.get.messageOffset + 1,
+ multiEntryAppendInfo.lastOffset - multiEntryAppendInfo.firstOffset + 1,
3,
"should have appended 3 entries"
)
@@ -1268,8 +1268,8 @@ class UnifiedLogTest {
// Append a Duplicate of the tail, when the entry at the tail has multiple
records.
val dupMultiEntryAppendInfo = log.appendAsLeader(createRecords,
leaderEpoch = 0)
assertEquals(
- multiEntryAppendInfo.firstOffset.get.messageOffset,
- dupMultiEntryAppendInfo.firstOffset.get.messageOffset,
+ multiEntryAppendInfo.firstOffset,
+ dupMultiEntryAppendInfo.firstOffset,
"Somehow appended a duplicate entry with multiple log records to the
tail"
)
assertEquals(multiEntryAppendInfo.lastOffset,
dupMultiEntryAppendInfo.lastOffset,
@@ -1305,8 +1305,8 @@ class UnifiedLogTest {
val origAppendInfo = log.appendAsLeader(createRecordsWithDuplicate,
leaderEpoch = 0)
val newAppendInfo = log.appendAsLeader(createRecordsWithDuplicate,
leaderEpoch = 0)
assertEquals(
- origAppendInfo.firstOffset.get.messageOffset,
- newAppendInfo.firstOffset.get.messageOffset,
+ origAppendInfo.firstOffset,
+ newAppendInfo.firstOffset,
"Inserted a duplicate records into the log"
)
assertEquals(origAppendInfo.lastOffset, newAppendInfo.lastOffset,
@@ -1786,7 +1786,7 @@ class UnifiedLogTest {
log.appendAsLeader(
TestUtils.singletonRecords(value = "hello".getBytes, timestamp =
mockTime.milliseconds),
leaderEpoch = 0
- ).firstOffset.get.messageOffset,
+ ).firstOffset,
"Should still be able to append and should get the logEndOffset
assigned to the new append")
// cleanup the log
@@ -2940,7 +2940,7 @@ class UnifiedLogTest {
new SimpleRecord("baz".getBytes))
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// add more transactional records
seq += 3
@@ -2948,13 +2948,13 @@ class UnifiedLogTest {
new SimpleRecord("blah".getBytes)), leaderEpoch = 0)
// LSO should not have changed
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// now transaction is committed
val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid,
epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
// first unstable offset is not updated until the high watermark is
advanced
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
// now there should be no first unstable offset
@@ -3347,7 +3347,7 @@ class UnifiedLogTest {
new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes),
new SimpleRecord("c".getBytes)), leaderEpoch = 0)
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// mix in some non-transactional data
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
@@ -3362,14 +3362,14 @@ class UnifiedLogTest {
new SimpleRecord("f".getBytes)), leaderEpoch = 0)
// LSO should not have changed
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// now first producer's transaction is aborted
val abortAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid1,
epoch, ControlRecordType.ABORT, mockTime.milliseconds())
log.updateHighWatermark(abortAppendInfo.lastOffset + 1)
// LSO should now point to one less than the first offset of the second
transaction
- assertEquals(secondAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(secondAppendInfo.firstOffset), log.firstUnstableOffset)
// commit the second transaction
val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid2,
epoch, ControlRecordType.COMMIT, mockTime.milliseconds())
@@ -3394,7 +3394,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig)
val firstAppendInfo = log.appendAsLeader(records, leaderEpoch = 0)
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
// this write should spill to the second segment
seq = 3
@@ -3402,7 +3402,7 @@ class UnifiedLogTest {
new SimpleRecord("d".getBytes),
new SimpleRecord("e".getBytes),
new SimpleRecord("f".getBytes)), leaderEpoch = 0)
- assertEquals(firstAppendInfo.firstOffset.map[Long](_.messageOffset),
log.firstUnstableOffset.asJava)
+ assertEquals(Some(firstAppendInfo.firstOffset), log.firstUnstableOffset)
assertEquals(3L, log.logEndOffsetMetadata.segmentBaseOffset)
// now abort the transaction
diff --git a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
index 4b4da0d12f3..94908caef5d 100644
--- a/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
+++ b/core/src/test/scala/unit/kafka/server/MockFetcherThread.scala
@@ -23,10 +23,10 @@ import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.storage.internals.log.{LogAppendInfo,
LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.junit.jupiter.api.Assertions._
-import java.util.{Optional, OptionalInt}
+import java.util.OptionalInt
import scala.collection.{Map, Set, mutable}
import scala.jdk.CollectionConverters._
@@ -102,7 +102,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
state.logStartOffset = partitionData.logStartOffset
state.highWatermark = partitionData.highWatermark
- Some(new LogAppendInfo(Optional.of(new LogOffsetMetadata(fetchOffset)),
+ Some(new LogAppendInfo(fetchOffset,
lastOffset,
lastEpoch,
maxTimestamp,
@@ -111,10 +111,7 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
state.logStartOffset,
RecordConversionStats.EMPTY,
CompressionType.NONE,
- CompressionType.NONE,
- batches.size,
FetchResponse.recordsSize(partitionData),
- true,
batches.headOption.map(_.lastOffset).getOrElse(-1)))
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 6a0feaa6456..7258999cd29 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.requests.{FetchRequest,
FetchResponse, UpdateMeta
import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
-import org.apache.kafka.storage.internals.log.{LogAppendInfo,
LogOffsetMetadata}
+import org.apache.kafka.storage.internals.log.{LogAppendInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -765,7 +765,7 @@ class ReplicaFetcherThreadTest {
when(partition.localLogOrException).thenReturn(log)
when(partition.appendRecordsToFollowerOrFutureReplica(any(),
any())).thenReturn(Some(new LogAppendInfo(
- Optional.empty[LogOffsetMetadata],
+ -1,
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
@@ -774,10 +774,7 @@ class ReplicaFetcherThreadTest {
-1L,
RecordConversionStats.EMPTY,
CompressionType.NONE,
- CompressionType.NONE,
- -1,
- 0, // No records.
- false,
+ -1, // No records.
-1L
)))
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
index 6f645456e7f..0ecaaabe223 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.common.requests.ProduceResponse.RecordError;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.OptionalInt;
/**
@@ -31,12 +30,11 @@ import java.util.OptionalInt;
*/
public class LogAppendInfo {
- public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new
LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
+ public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new
LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
- RecordConversionStats.EMPTY, CompressionType.NONE,
CompressionType.NONE, -1, -1,
- false, -1L);
+ RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
- private Optional<LogOffsetMetadata> firstOffset;
+ private long firstOffset;
private long lastOffset;
private long maxTimestamp;
private long offsetOfMaxTimestamp;
@@ -46,21 +44,16 @@ public class LogAppendInfo {
private final OptionalInt lastLeaderEpoch;
private final CompressionType sourceCompression;
- private final CompressionType targetCompression;
- private final int shallowCount;
private final int validBytes;
- private final boolean offsetsMonotonic;
private final long lastOffsetOfFirstBatch;
private final List<RecordError> recordErrors;
- private final String errorMessage;
private final LeaderHwChange leaderHwChange;
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set
unless the message format is less than V2 and we are appending
- * to the follower. If the message is a
duplicate message the segment base offset and relative position
- * in segment will be unknown.
+ * to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding
to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
@@ -69,13 +62,10 @@ public class LogAppendInfo {
* @param logStartOffset The start offset of the log at the time
of this append.
* @param recordConversionStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set
(send by the producer)
- * @param targetCompression The target codec of the message set(after
applying the broker compression configuration if any)
- * @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
- * @param offsetsMonotonic Are the offsets in this message set
monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
*/
- public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
+ public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
@@ -84,22 +74,18 @@ public class LogAppendInfo {
long logStartOffset,
RecordConversionStats recordConversionStats,
CompressionType sourceCompression,
- CompressionType targetCompression,
- int shallowCount,
int validBytes,
- boolean offsetsMonotonic,
long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp,
offsetOfMaxTimestamp, logAppendTime, logStartOffset,
- recordConversionStats, sourceCompression, targetCompression,
shallowCount, validBytes, offsetsMonotonic,
- lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
null, LeaderHwChange.NONE);
+ recordConversionStats, sourceCompression, validBytes,
lastOffsetOfFirstBatch, Collections.<RecordError>emptyList(),
+ LeaderHwChange.NONE);
}
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set
unless the message format is less than V2 and we are appending
- * to the follower. If the message is a
duplicate message the segment base offset and relative position
- * in segment will be unknown.
+ * to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding
to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
@@ -108,17 +94,13 @@ public class LogAppendInfo {
* @param logStartOffset The start offset of the log at the time
of this append.
* @param recordConversionStats Statistics collected during record
processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set
(send by the producer)
- * @param targetCompression The target codec of the message set(after
applying the broker compression configuration if any)
- * @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
- * @param offsetsMonotonic Are the offsets in this message set
monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
- * @param errorMessage error message
* @param recordErrors List of record errors that caused the
respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs
to be increased after appending record
* Same if high watermark is not changed.
None is the default value and it means append failed
*/
- public LogAppendInfo(Optional<LogOffsetMetadata> firstOffset,
+ public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
@@ -127,13 +109,9 @@ public class LogAppendInfo {
long logStartOffset,
RecordConversionStats recordConversionStats,
CompressionType sourceCompression,
- CompressionType targetCompression,
- int shallowCount,
int validBytes,
- boolean offsetsMonotonic,
long lastOffsetOfFirstBatch,
List<RecordError> recordErrors,
- String errorMessage,
LeaderHwChange leaderHwChange) {
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
@@ -144,21 +122,17 @@ public class LogAppendInfo {
this.logStartOffset = logStartOffset;
this.recordConversionStats = recordConversionStats;
this.sourceCompression = sourceCompression;
- this.targetCompression = targetCompression;
- this.shallowCount = shallowCount;
this.validBytes = validBytes;
- this.offsetsMonotonic = offsetsMonotonic;
this.lastOffsetOfFirstBatch = lastOffsetOfFirstBatch;
this.recordErrors = recordErrors;
- this.errorMessage = errorMessage;
this.leaderHwChange = leaderHwChange;
}
- public Optional<LogOffsetMetadata> firstOffset() {
+ public long firstOffset() {
return firstOffset;
}
- public void setFirstOffset(Optional<LogOffsetMetadata> firstOffset) {
+ public void setFirstOffset(long firstOffset) {
this.firstOffset = firstOffset;
}
@@ -218,30 +192,14 @@ public class LogAppendInfo {
return sourceCompression;
}
- public CompressionType targetCompression() {
- return targetCompression;
- }
-
- public int shallowCount() {
- return shallowCount;
- }
-
public int validBytes() {
return validBytes;
}
- public boolean offsetsMonotonic() {
- return offsetsMonotonic;
- }
-
public List<RecordError> recordErrors() {
return recordErrors;
}
- public String errorMessage() {
- return errorMessage;
- }
-
public LeaderHwChange leaderHwChange() {
return leaderHwChange;
}
@@ -253,7 +211,7 @@ public class LogAppendInfo {
* offset to avoid decompressing the data.
*/
public long firstOrLastOffsetOfFirstBatch() {
- return firstOffset.map(x ->
x.messageOffset).orElse(lastOffsetOfFirstBatch);
+ return firstOffset >= 0 ? firstOffset : lastOffsetOfFirstBatch;
}
/**
@@ -262,8 +220,8 @@ public class LogAppendInfo {
* @return Maximum possible number of messages described by LogAppendInfo
*/
public long numMessages() {
- if (firstOffset.isPresent() && firstOffset.get().messageOffset >= 0 &&
lastOffset >= 0) {
- return lastOffset - firstOffset.get().messageOffset + 1;
+ if (firstOffset >= 0 && lastOffset >= 0) {
+ return lastOffset - firstOffset + 1;
}
return 0;
}
@@ -276,24 +234,22 @@ public class LogAppendInfo {
*/
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch,
maxTimestamp, offsetOfMaxTimestamp, logAppendTime, logStartOffset,
recordConversionStats,
- sourceCompression, targetCompression, shallowCount,
validBytes, offsetsMonotonic, lastOffsetOfFirstBatch, recordErrors,
errorMessage, newLeaderHwChange);
+ sourceCompression, validBytes, lastOffsetOfFirstBatch,
recordErrors, newLeaderHwChange);
}
public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long
logStartOffset) {
- return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordConversionStats.EMPTY, CompressionType.NONE,
CompressionType.NONE, -1, -1,
- false, -1L);
+ return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L);
}
/**
* In ProduceResponse V8+, we add two new fields record_errors and
error_message (see KIP-467).
* For any record failures with InvalidTimestamp or
InvalidRecordException, we construct a LogAppendInfo object like the one
- * in unknownLogAppendInfoWithLogStartOffset, but with additional fields
recordErrors and errorMessage
+ * in unknownLogAppendInfoWithLogStartOffset, but with additional fields
recordErrors
*/
- public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long
logStartOffset, List<RecordError> recordErrors, String errorMessage) {
- return new LogAppendInfo(Optional.empty(), -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
- RecordConversionStats.EMPTY, CompressionType.NONE,
CompressionType.NONE, -1, -1,
- false, -1L, recordErrors, errorMessage, LeaderHwChange.NONE);
+ public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long
logStartOffset, List<RecordError> recordErrors) {
+ return new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
+ RecordConversionStats.EMPTY, CompressionType.NONE, -1, -1L,
recordErrors, LeaderHwChange.NONE);
}
@Override
@@ -308,13 +264,9 @@ public class LogAppendInfo {
", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordConversionStats +
", sourceCompression=" + sourceCompression +
- ", targetCompression=" + targetCompression +
- ", shallowCount=" + shallowCount +
", validBytes=" + validBytes +
- ", offsetsMonotonic=" + offsetsMonotonic +
", lastOffsetOfFirstBatch=" + lastOffsetOfFirstBatch +
", recordErrors=" + recordErrors +
- ", errorMessage='" + errorMessage + '\'' +
", leaderHwChange=" + leaderHwChange +
')';
}