This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be531c681c8 KAFKA-15695: Update the local log start offset of a log
after rebuilding the auxiliary state (#14649)
be531c681c8 is described below
commit be531c681c8b12258abb7199567ce49f84b0d9b4
Author: Nikhil Ramakrishnan <[email protected]>
AuthorDate: Tue Dec 12 16:13:42 2023 +0000
KAFKA-15695: Update the local log start offset of a log after rebuilding
the auxiliary state (#14649)
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>, Divij Vaidya <[email protected]>, Kamal
Chandraprakash<[email protected]>, Alexandre Dupriez
<[email protected]>
---
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java | 5 ++++-
core/src/main/scala/kafka/log/UnifiedLog.scala | 3 ++-
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala | 6 ++++++
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
index d203bfe7c5f..63b608015df 100644
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
@@ -229,8 +229,11 @@ public class ReplicaFetcherTierStateMachine implements
TierStateMachine {
Partition partition =
replicaMgr.getPartitionOrException(topicPartition);
partition.truncateFullyAndStartAt(nextOffset, false,
Option.apply(leaderLogStartOffset));
- // Build leader epoch cache.
+ // Increment start offsets
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented);
+ unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset,
LeaderOffsetIncremented);
+
+ // Build leader epoch cache.
List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm,
remoteLogSegmentMetadata);
if (unifiedLog.leaderEpochCache().isDefined()) {
unifiedLog.leaderEpochCache().get().assign(epochs);
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 28d92529ff6..4f79cf10d82 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -972,7 +972,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
- private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long,
reason: LogStartOffsetIncrementReason): Unit = {
+ def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason:
LogStartOffsetIncrementReason): Unit = {
lock synchronized {
if (newLocalLogStartOffset > localLogStartOffset()) {
_localLogStartOffset = newLocalLogStartOffset
@@ -1815,6 +1815,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset)
logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+ if (remoteLogEnabled()) _localLogStartOffset = newOffset
rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index abda9df127b..0b9533380b1 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3646,6 +3646,12 @@ class UnifiedLogTest {
log.maybeIncrementLogStartOffset(newLogStartOffset,
LogStartOffsetIncrementReason.SegmentDeletion)
assertEquals(newLogStartOffset, log.logStartOffset)
assertEquals(log.logStartOffset, log.localLogStartOffset())
+
+ // Truncate the local log and verify that the offsets are updated to
expected values
+ val newLocalLogStartOffset = 60L;
+ log.truncateFullyAndStartAt(newLocalLogStartOffset,
Option.apply(newLogStartOffset))
+ assertEquals(newLogStartOffset, log.logStartOffset)
+ assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
}
private class MockLogOffsetsListener extends LogOffsetsListener {