This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be531c681c8 KAFKA-15695: Update the local log start offset of a log 
after rebuilding the auxiliary state (#14649)
be531c681c8 is described below

commit be531c681c8b12258abb7199567ce49f84b0d9b4
Author: Nikhil Ramakrishnan <[email protected]>
AuthorDate: Tue Dec 12 16:13:42 2023 +0000

    KAFKA-15695: Update the local log start offset of a log after rebuilding 
the auxiliary state (#14649)
    
    Reviewers: Satish Duggana <[email protected]>, Luke Chen 
<[email protected]>,  Divij Vaidya <[email protected]>, Kamal 
Chandraprakash<[email protected]>, Alexandre Dupriez 
<[email protected]>
---
 core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java | 5 ++++-
 core/src/main/scala/kafka/log/UnifiedLog.scala                      | 3 ++-
 core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala             | 6 ++++++
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java 
b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
index d203bfe7c5f..63b608015df 100644
--- a/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
+++ b/core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java
@@ -229,8 +229,11 @@ public class ReplicaFetcherTierStateMachine implements 
TierStateMachine {
                 Partition partition = 
replicaMgr.getPartitionOrException(topicPartition);
                 partition.truncateFullyAndStartAt(nextOffset, false, 
Option.apply(leaderLogStartOffset));
 
-                // Build leader epoch cache.
+                // Increment start offsets
                 unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, 
LeaderOffsetIncremented);
+                unifiedLog.maybeIncrementLocalLogStartOffset(nextOffset, 
LeaderOffsetIncremented);
+
+                // Build leader epoch cache.
                 List<EpochEntry> epochs = readLeaderEpochCheckpoint(rlm, 
remoteLogSegmentMetadata);
                 if (unifiedLog.leaderEpochCache().isDefined()) {
                     unifiedLog.leaderEpochCache().get().assign(epochs);
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 28d92529ff6..4f79cf10d82 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -972,7 +972,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
-  private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, 
reason: LogStartOffsetIncrementReason): Unit = {
+  def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, reason: 
LogStartOffsetIncrementReason): Unit = {
     lock synchronized {
       if (newLocalLogStartOffset > localLogStartOffset()) {
         _localLogStartOffset = newLocalLogStartOffset
@@ -1815,6 +1815,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         leaderEpochCache.foreach(_.clearAndFlush())
         producerStateManager.truncateFullyAndStartAt(newOffset)
         logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
+        if (remoteLogEnabled()) _localLogStartOffset = newOffset
         rebuildProducerState(newOffset, producerStateManager)
         updateHighWatermark(localLog.logEndOffsetMetadata)
       }
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index abda9df127b..0b9533380b1 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3646,6 +3646,12 @@ class UnifiedLogTest {
     log.maybeIncrementLogStartOffset(newLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
     assertEquals(newLogStartOffset, log.logStartOffset)
     assertEquals(log.logStartOffset, log.localLogStartOffset())
+
+    // Truncate the local log and verify that the offsets are updated to 
expected values
+    val newLocalLogStartOffset = 60L;
+    log.truncateFullyAndStartAt(newLocalLogStartOffset, 
Option.apply(newLogStartOffset))
+    assertEquals(newLogStartOffset, log.logStartOffset)
+    assertEquals(newLocalLogStartOffset, log.localLogStartOffset())
   }
 
   private class MockLogOffsetsListener extends LogOffsetsListener {

Reply via email to