satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1049205016


##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: 
RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, 
false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: 
Long,
+                                                
epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): 
Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the 
leader
+      val partitionsWithEpochs = Map(partition -> new 
EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + 
partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && 
log.config.remoteLogConfig.remoteStorageEnable) {

Review Comment:
   As discussed offline, the current follower fetch retries when it receives an 
error while building aurxiliary state. It will eventually gets the auxiliary 
data from remote storage for the available leader-log-start-offset. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to