divijvaidya commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1042140948


##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: 
RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, 
false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the 
leader
+      val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + 
partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {

Review Comment:
   Consider the following scenario:
   1. Leader is archiving to tiered storage and has a follower.
   2. Follower has caught up to offset X (exclusive).
   3. While follower is offline, leader moves X to tiered storage and expires 
data locally till Y, such that, leaderLocalLogStartOffset > X and Y = 
leaderLocalLogStartOffset. Meanwhile, X has been expired from tiered storage as 
well. Hence, X < globalLogStartOffset as well. Now, there could be a scenario 
where globalLogStartOffset > leaderLocalLogStartOffset because segments has 
been expired from remote but not from local.
   3. Follower comes online and tries to fetch X from leader, leader throws 
moved to tiered storage exception.
   4. Follower moves to buildAux state and tries to fetch the metadata. The 
metadata may not exist since the segment has been deleted in remote storage and 
we will get an error.
   
   This could be addressed at replica manager where it could detect if the 
remote segments have been deleted and accordingly throw an out of bound instead 
of move to tiered storage exception, but we should also add a defensive 
handling check here. In the above scenario, we should directly move to 
truncation instead of build aux state.
   
   The defensive check could be `&& leaderLocalLogStartOffset > 
leaderLogStartOffset` over here.
   
   Also, please add a test for this scenario.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to