This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 31227857ae0 KAFKA-14888: Added remote log segments retention mechanism 
based on time and size. (#13561)
31227857ae0 is described below

commit 31227857ae032c402ce491cc46c92cdff9b1792b
Author: Satish Duggana <sati...@apache.org>
AuthorDate: Fri Aug 25 05:27:59 2023 +0530

    KAFKA-14888: Added remote log segments retention mechanism based on time 
and size. (#13561)
    
    This change introduces a remote log segment segment retention cleanup 
mechanism.
    
    RemoteLogManager runs retention cleanup activity tasks on each leader 
replica. It assesses factors such as overall size and retention duration, 
subsequently removing qualified segments from remote storage. This process also 
involves adjusting the log-start-offset within the UnifiedLog accordingly. It 
also cleans up the segments which have epochs earlier than the earliest leader 
epoch in the current leader.
    
    Co-authored-by: Satish Duggana <sati...@apache.org>
    Co-authored-by: Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
    
    Reviewers: Jun Rao <jun...@gmail.com>, Divij Vaidya <di...@amazon.com, Luke 
Chen <show...@gmail.com>, Kamal Chandraprakash 
<kamal.chandraprak...@gmail.com>, Christo Lolov <lol...@amazon.com>, Jorge 
Esteban Quilcate Otoya <quilcate.jo...@gmail.com>, Alexandre Dupriez 
<alexandre.dupr...@gmail.com>, Nikhil Ramakrishnan 
<ramakrishnan.nik...@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +-
 .../java/kafka/log/remote/RemoteLogManager.java    | 455 ++++++++++++++++++++-
 core/src/main/scala/kafka/log/LogManager.scala     |   3 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 153 +++++--
 .../src/main/scala/kafka/server/BrokerServer.scala |   8 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   8 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 405 ++++++++++++++----
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala |  18 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   1 +
 .../internals/epoch/LeaderEpochFileCache.java      |  15 +-
 .../kafka/storage/internals/log/LogConfig.java     |   6 +-
 11 files changed, 935 insertions(+), 141 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 124b028eb41..2ded5338710 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -336,9 +336,9 @@
 
     <!-- storage -->
     <suppress checks="CyclomaticComplexity"
-              files="(LogValidator|RemoteLogManagerConfig).java"/>
+              
files="(LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>
     <suppress checks="NPathComplexity"
-              files="(LogValidator|RemoteIndexCache).java"/>
+              files="(LogValidator|RemoteLogManager|RemoteIndexCache).java"/>
     <suppress checks="ParameterNumber"
               files="(LogAppendInfo|RemoteLogManagerConfig).java"/>
 
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index a9372a80fa0..f84fd2f80f7 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -92,10 +92,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -110,6 +112,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -121,7 +124,8 @@ import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
  * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
  * - receives any leader and follower replica events and partition stop events 
and act on them
  * - also provides APIs to fetch indexes, metadata about remote log segments
- * - copying log segments to remote storage
+ * - copying log segments to the remote storage
+ * - cleaning up segments that are expired based on retention size or 
retention time
  */
 public class RemoteLogManager implements Closeable {
 
@@ -132,6 +136,7 @@ public class RemoteLogManager implements Closeable {
     private final String logDir;
     private final Time time;
     private final Function<TopicPartition, Optional<UnifiedLog>> fetchLog;
+    private final BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset;
     private final BrokerTopicStats brokerTopicStats;
 
     private final RemoteStorageManager remoteLogStorageManager;
@@ -165,6 +170,8 @@ public class RemoteLogManager implements Closeable {
      * @param time      Time instance.
      * @param clusterId The cluster id.
      * @param fetchLog  function to get UnifiedLog instance for a given topic.
+     * @param updateRemoteLogStartOffset function to update the 
log-start-offset for a given topic partition.
+     * @param brokerTopicStats BrokerTopicStats instance to update the 
respective metrics.
      */
     public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             int brokerId,
@@ -172,6 +179,7 @@ public class RemoteLogManager implements Closeable {
                             String clusterId,
                             Time time,
                             Function<TopicPartition, Optional<UnifiedLog>> 
fetchLog,
+                            BiConsumer<TopicPartition, Long> 
updateRemoteLogStartOffset,
                             BrokerTopicStats brokerTopicStats) throws 
IOException {
         this.rlmConfig = rlmConfig;
         this.brokerId = brokerId;
@@ -179,6 +187,7 @@ public class RemoteLogManager implements Closeable {
         this.clusterId = clusterId;
         this.time = time;
         this.fetchLog = fetchLog;
+        this.updateRemoteLogStartOffset = updateRemoteLogStartOffset;
         this.brokerTopicStats = brokerTopicStats;
 
         remoteLogStorageManager = createRemoteStorageManager();
@@ -298,11 +307,6 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
-    // for testing
-    public RLMScheduledThreadPool rlmScheduledThreadPool() {
-        return rlmScheduledThreadPool;
-    }
-
     /**
      * Callback to receive any leadership changes for the topic partitions 
assigned to this broker. If there are no
      * existing tasks for a given topic partition then it will assign new 
leader or follower task else it will convert the
@@ -486,15 +490,25 @@ public class RemoteLogManager implements Closeable {
             throw new KafkaException("Topic id does not exist for topic 
partition: " + tp);
         }
 
+        Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(tp);
+        if (!unifiedLogOptional.isPresent()) {
+            throw new KafkaException("UnifiedLog does not exist for topic 
partition: " + tp);
+        }
+
+        UnifiedLog unifiedLog = unifiedLogOptional.get();
+
         // Get the respective epoch in which the starting-offset exists.
         OptionalInt maybeEpoch = 
leaderEpochCache.epochForOffset(startingOffset);
+        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, tp);
+        NavigableMap<Integer, Long> epochWithOffsets = 
buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
         while (maybeEpoch.isPresent()) {
             int epoch = maybeEpoch.getAsInt();
 
-            Iterator<RemoteLogSegmentMetadata> iterator = 
remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, 
tp), epoch);
+            Iterator<RemoteLogSegmentMetadata> iterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
             while (iterator.hasNext()) {
                 RemoteLogSegmentMetadata rlsMetadata = iterator.next();
-                if (rlsMetadata.maxTimestampMs() >= timestamp && 
rlsMetadata.endOffset() >= startingOffset) {
+                if (rlsMetadata.maxTimestampMs() >= timestamp && 
rlsMetadata.endOffset() >= startingOffset &&
+                        isRemoteSegmentWithinLeaderEpochs(rlsMetadata, 
unifiedLog.logEndOffset(), epochWithOffsets)) {
                     return lookupTimestamp(rlsMetadata, timestamp, 
startingOffset);
                 }
             }
@@ -725,6 +739,8 @@ public class RemoteLogManager implements Closeable {
                 
.remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
             
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
             copiedOffsetOption = OptionalLong.of(endOffset);
+            // Update the highest offset in remote storage for this 
partition's log so that the local log segments
+            // are not deleted before they are copied to remote storage.
             log.updateHighestOffsetInRemoteStorage(endOffset);
             logger.info("Copied {} to remote storage with segment-id: {}", 
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
         }
@@ -744,14 +760,21 @@ public class RemoteLogManager implements Closeable {
                     return;
                 }
 
+                UnifiedLog log = unifiedLogOptional.get();
                 if (isLeader()) {
                     // Copy log segments to remote storage
-                    copyLogSegmentsToRemote(unifiedLogOptional.get());
+                    copyLogSegmentsToRemote(log);
+                    // Cleanup/delete expired remote log segments
+                    cleanupExpiredRemoteLogSegments();
+                } else {
+                    long offset = findHighestRemoteOffset(topicIdPartition, 
log);
+                    // Update the highest offset in remote storage for this 
partition's log so that the local log segments
+                    // are not deleted before they are copied to remote 
storage.
+                    log.updateHighestOffsetInRemoteStorage(offset);
                 }
             } catch (InterruptedException ex) {
                 if (!isCancelled()) {
-                    logger.warn("Current thread for topic-partition-id {} is 
interrupted, this task won't be rescheduled. " +
-                            "Reason: {}", topicIdPartition, ex.getMessage());
+                    logger.warn("Current thread for topic-partition-id {} is 
interrupted. Reason: {}", topicIdPartition, ex.getMessage());
                 }
             } catch (Exception ex) {
                 if (!isCancelled()) {
@@ -761,11 +784,385 @@ public class RemoteLogManager implements Closeable {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (remainingBreachedSize > 0) {
+                        long remainingBytes = remainingBreachedSize - 
x.segmentSizeInBytes();
+                        if (remainingBytes >= 0) {
+                            remainingBreachedSize = remainingBytes;
+                            return true;
+                        }
+                    }
+
+                    return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    logger.info("Deleting remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    segmentMetadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    logger.info("Deleted remote log segment {}", 
segmentMetadata.remoteLogSegmentId());
+                    return true;
+                }
+
+                return false;
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                logger.debug("No remote log segments available on remote 
storage for partition: {}", topicIdPartition);
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                logger.debug("No UnifiedLog instance available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                logger.debug("No leader epoch cache available for partition: 
{}", topicIdPartition);
+                return;
+            }
+
+            final Set<Integer> epochsSet = new HashSet<>();
+            // Good to have an API from RLMM to get all the remote leader 
epochs of all the segments of a partition
+            // instead of going through all the segments and building it here.
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            // Build the leader epoch map by filtering the epochs that do not 
have any records.
+            NavigableMap<Integer, Long> epochWithOffsets = 
buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
+            Optional<EpochEntry> earliestEpochEntryOptional = 
leaderEpochCache.earliestEntry();
+
+            long logStartOffset = log.logStartOffset();
+            long logEndOffset = log.logEndOffset();
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(log.config().retentionSize,
+                    log.onlyLocalLogSegmentsSize(), logEndOffset, 
epochWithOffsets);
+            Optional<RetentionTimeData> retentionTimeData = 
buildRetentionTimeData(log.config().retentionMs);
+
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+            Iterator<Integer> epochIterator = 
epochWithOffsets.navigableKeySet().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochIterator.hasNext()) {
+                Integer epoch = epochIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        logger.info("Returning from remote log segments 
cleanup for the remaining segments as the task state is changed.");
+                        return;
+                    }
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+
+                    // check whether the segment contains the required epoch 
range with in the current leader epoch lineage.
+                    if (isRemoteSegmentWithinLeaderEpochs(metadata, 
logEndOffset, epochWithOffsets)) {
+                        isSegmentDeleted =
+                                
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                        
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
logStartOffset);
+                    }
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
less than the earliest-epoch known
+            // to the leader. This will remove the unreferenced segments in 
the remote storage. This is needed for
+            // unclean leader election scenarios as the remote storage can 
have epochs earlier to the current leader's
+            // earliest leader epoch.
+            if (earliestEpochEntryOptional.isPresent()) {
+                EpochEntry earliestEpochEntry = 
earliestEpochEntryOptional.get();
+                Iterator<Integer> epochsToClean = 
remoteLeaderEpochs.stream().filter(x -> x < 
earliestEpochEntry.epoch).iterator();
+                while (epochsToClean.hasNext()) {
+                    int epoch = epochsToClean.next();
+                    Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                    while (segmentsToBeCleaned.hasNext()) {
+                        if (isCancelled() || !isLeader()) {
+                            return;
+                        }
+                        // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
+                        
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentsToBeCleaned.next());
+                    }
+                }
+            }
+
+            // Update log start offset with the computed value after retention 
cleanup is done
+            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> 
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+        }
+
+        private Optional<RetentionTimeData> buildRetentionTimeData(long 
retentionMs) {
+            return retentionMs > -1
+                    ? Optional.of(new RetentionTimeData(retentionMs, 
time.milliseconds() - retentionMs))
+                    : Optional.empty();
+        }
+
+        private Optional<RetentionSizeData> buildRetentionSizeData(long 
retentionSize,
+                                                                   long 
onlyLocalLogSegmentsSize,
+                                                                   long 
logEndOffset,
+                                                                   
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
+            if (retentionSize > -1) {
+                long remoteLogSizeBytes = 0L;
+                Set<RemoteLogSegmentId> visitedSegmentIds = new HashSet<>();
+                for (Integer epoch : epochEntries.navigableKeySet()) {
+                    // remoteLogSize(topicIdPartition, epochEntry.epoch) may 
not be completely accurate as the remote
+                    // log size may be computed for all the segments but not 
for segments with in the current
+                    // partition's leader epoch lineage. Better to revisit 
this API.
+                    // remoteLogSizeBytes += 
remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch);
+                    Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+                    while (segmentsIterator.hasNext()) {
+                        RemoteLogSegmentMetadata segmentMetadata = 
segmentsIterator.next();
+                        RemoteLogSegmentId segmentId = 
segmentMetadata.remoteLogSegmentId();
+                        if (!visitedSegmentIds.contains(segmentId) && 
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) 
{
+                            remoteLogSizeBytes += 
segmentMetadata.segmentSizeInBytes();
+                            visitedSegmentIds.add(segmentId);
+                        }
+                    }
+                }
+
+                // This is the total size of segments in local log that have 
their base-offset > local-log-start-offset
+                // and size of the segments in remote storage which have their 
end-offset < local-log-start-offset.
+                long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
+                if (totalSize > retentionSize) {
+                    long remainingBreachedSize = totalSize - retentionSize;
+                    RetentionSizeData retentionSizeData = new 
RetentionSizeData(retentionSize, remainingBreachedSize);
+                    return Optional.of(retentionSizeData);
+                }
+            }
+
+            return Optional.empty();
+        }
+
         public String toString() {
             return this.getClass().toString() + "[" + topicIdPartition + "]";
         }
     }
 
+    /**
+     * Returns true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
+     * The constraints here are as follows:
+     * - The segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset in the partition leader epoch lineage.
+     * - The segment's end offset should be less than or equal to the 
respective leader epoch's offset in the partition leader epoch lineage.
+     * - The segment's epoch lineage(epoch and offset) should be same as 
leader epoch lineage((epoch and offset)) except
+     * for the first and the last epochs in the segment.
+     *
+     * @param segmentMetadata The remote segment metadata to be validated.
+     * @param logEndOffset    The log end offset of the partition.
+     * @param leaderEpochs    The leader epoch lineage of the partition by 
filtering the epochs containing no data.
+     * @return true if the remote segment's epoch/offsets are within the 
leader epoch lineage of the partition.
+     */
+    // Visible for testing
+    public static boolean 
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
+                                                            long logEndOffset,
+                                                            
NavigableMap<Integer, Long> leaderEpochs) {
+        long segmentEndOffset = segmentMetadata.endOffset();
+        // Filter epochs that does not have any messages/records associated 
with them.
+        NavigableMap<Integer, Long> segmentLeaderEpochs = 
buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
+        // Check for out of bound epochs between segment epochs and current 
leader epochs.
+        Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
+        Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+        if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > 
leaderEpochs.lastKey()) {
+            LOGGER.debug("[{}] Remote segment {} is not within the partition 
leader epoch lineage. Remote segment epochs: {} and partition leader epochs: 
{}",
+                    segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
+            return false;
+        }
+
+        for (Map.Entry<Integer, Long> entry : segmentLeaderEpochs.entrySet()) {
+            int epoch = entry.getKey();
+            long offset = entry.getValue();
+
+            // If segment's epoch does not exist in the leader epoch lineage 
then it is not a valid segment.
+            if (!leaderEpochs.containsKey(epoch)) {
+                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
+                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                return false;
+            }
+
+            // Segment's first epoch's offset should be more than or equal to 
the respective leader epoch's offset.
+            if (epoch == segmentFirstEpoch && offset < 
leaderEpochs.get(epoch)) {
+                LOGGER.debug("[{}]  Remote segment {}'s first epoch {}'s 
offset is less than leader epoch's offset {}.",
+                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
+                return false;
+            }
+
+            // Segment's end offset should be less than or equal to the 
respective leader epoch's offset.
+            if (epoch == segmentLastEpoch) {
+                Map.Entry<Integer, Long> nextEntry = 
leaderEpochs.higherEntry(epoch);
+                if (nextEntry != null && segmentEndOffset > 
nextEntry.getValue() - 1) {
+                    LOGGER.debug("[{}]  Remote segment {}'s end offset {} is 
more than leader epoch's offset {}.",
+                            segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 
1);
+                    return false;
+                }
+            }
+
+            // Next segment epoch entry and next leader epoch entry should be 
same to ensure that the segment's epoch
+            // is within the leader epoch lineage.
+            if (epoch != segmentLastEpoch && 
!leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch)))
 {
+                LOGGER.debug("[{}]  Remote segment {}'s epoch {} is not within 
the leader epoch lineage. Remote segment epochs: {} and partition leader 
epochs: {}",
+                        segmentMetadata.topicIdPartition(), 
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+                return false;
+            }
+
+        }
+
+        // segment end offset should be with in the log end offset.
+        return segmentEndOffset < logEndOffset;
+    }
+
+    /**
+     * Returns a map containing the epoch vs start-offset for the given leader 
epoch map by filtering the epochs that
+     * does not contain any messages/records associated with them.
+     *
+     * For ex:
+     *  <epoch - start offset>
+     *  0 - 0
+     *  1 - 10
+     *  2 - 20
+     *  3 - 30
+     *  4 - 40
+     *  5 - 60  // epoch 5 does not have records or messages associated with it
+     *  6 - 60
+     *  7 - 70
+     *
+     *  When the above leaderEpochMap is passed to this method, it returns the 
following map:
+     *  <epoch - start offset>
+     *  0 - 0
+     *  1 - 10
+     *  2 - 20
+     *  3 - 30
+     *  4 - 40
+     *  6 - 60
+     *  7 - 70
+     *
+     * @param leaderEpochs The leader epoch map to be refined.
+     */
+    // Visible for testing
+    public static NavigableMap<Integer, Long> 
buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
+        List<Integer> duplicatedEpochs = new ArrayList<>();
+        Map.Entry<Integer, Long> previousEntry = null;
+        for (Map.Entry<Integer, Long> entry : leaderEpochs.entrySet()) {
+            if (previousEntry != null && 
previousEntry.getValue().equals(entry.getValue())) {
+                duplicatedEpochs.add(previousEntry.getKey());
+            }
+            previousEntry = entry;
+        }
+
+        if (duplicatedEpochs.isEmpty()) {
+            return leaderEpochs;
+        }
+
+        TreeMap<Integer, Long> filteredLeaderEpochs = new 
TreeMap<>(leaderEpochs);
+        for (Integer duplicatedEpoch : duplicatedEpochs) {
+            filteredLeaderEpochs.remove(duplicatedEpoch);
+        }
+        return filteredLeaderEpochs;
+    }
+
     public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
         int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
         TopicPartition tp = remoteStorageFetchInfo.topicPartition;
@@ -1096,6 +1493,42 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
+    // Visible for testing
+    public static class RetentionSizeData {
+        private final long retentionSize;
+        private final long remainingBreachedSize;
+
+        public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+            if (retentionSize < 0)
+                throw new IllegalArgumentException("retentionSize should be 
non negative, but it is " + retentionSize);
+
+            if (remainingBreachedSize <= 0) {
+                throw new IllegalArgumentException("remainingBreachedSize 
should be more than zero, but it is " + remainingBreachedSize);
+            }
+
+            this.retentionSize = retentionSize;
+            this.remainingBreachedSize = remainingBreachedSize;
+        }
+    }
+
+    // Visible for testing
+    public static class RetentionTimeData {
+
+        private final long retentionMs;
+        private final long cleanupUntilMs;
+
+        public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+            if (retentionMs < 0)
+                throw new IllegalArgumentException("retentionMs should be non 
negative, but it is " + retentionMs);
+
+            if (cleanupUntilMs < 0)
+                throw new IllegalArgumentException("cleanupUntilMs should be 
non negative, but it is " + cleanupUntilMs);
+
+            this.retentionMs = retentionMs;
+            this.cleanupUntilMs = cleanupUntilMs;
+        }
+    }
+
     // Visible for testing
     static class EnrichedLogSegment {
         private final LogSegment logSegment;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index bed9f0dfa03..78e48010e52 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -430,7 +430,8 @@ class LogManager(logDirs: Seq[File],
               val remainingLogs = decNumRemainingLogs(numRemainingLogs, 
dir.getAbsolutePath)
               val currentNumLoaded = logsToLoad.length - remainingLogs
               log match {
-                case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments in ${logLoadDurationMs}ms " +
+                case Some(loadedLog) => info(s"Completed load of $loadedLog 
with ${loadedLog.numberOfSegments} segments, " +
+                  s"local-log-start-offset ${loadedLog.localLogStartOffset()} 
and log-end-offset ${loadedLog.logEndOffset} in ${logLoadDurationMs}ms " +
                   s"($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
                 case None => info(s"Error while loading logs in $logDir in 
${logLoadDurationMs}ms ($currentNumLoaded/${logsToLoad.length} completed in 
$logDirAbsolutePath)")
               }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 72e0784e37d..10c86183b3f 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -147,11 +147,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   def localLogStartOffset(): Long = _localLogStartOffset
 
+  // This is the offset(inclusive) until which segments are copied to the 
remote storage.
   @volatile private var highestOffsetInRemoteStorage: Long = -1L
 
   locally {
+    def updateLocalLogStartOffset(offset: Long): Unit = {
+      _localLogStartOffset = offset
+
+      if (highWatermark < offset) {
+        updateHighWatermark(offset)
+      }
+
+      if (this.recoveryPoint < offset) {
+        localLog.updateRecoveryPoint(offset)
+      }
+    }
+
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
@@ -162,6 +178,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     logOffsetsListener = listener
   }
 
+  def updateLogStartOffsetFromRemoteTier(remoteLogStartOffset: Long): Unit = {
+    if (!remoteLogEnabled()) {
+      error("Ignoring the call as the remote log storage is disabled")
+      return;
+    }
+    maybeIncrementLogStartOffset(remoteLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+  }
+
   def remoteLogEnabled(): Boolean = {
     // Remote log is enabled only for non-compact and non-internal topics
     remoteStorageSystemEnable &&
@@ -520,6 +544,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       localLog.updateRecoveryPoint(offset)
     }
   }
+
   def updateHighestOffsetInRemoteStorage(offset: Long): Unit = {
     if (!remoteLogEnabled())
       warn(s"Unable to update the highest offset in remote storage with offset 
$offset since remote storage is not enabled. The existing highest offset is 
$highestOffsetInRemoteStorage.")
@@ -957,6 +982,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private def maybeIncrementLocalLogStartOffset(newLocalLogStartOffset: Long, 
reason: LogStartOffsetIncrementReason): Unit = {
+    lock synchronized {
+      if (newLocalLogStartOffset > localLogStartOffset()) {
+        _localLogStartOffset = newLocalLogStartOffset
+        info(s"Incremented local log start offset to ${localLogStartOffset()} 
due to reason $reason")
+      }
+    }
+  }
+
   /**
    * Increment the log start offset if the provided offset is larger.
    *
@@ -967,7 +1001,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * @throws OffsetOutOfRangeException if the log start offset is greater than 
the high watermark
    * @return true if the log start offset was updated; otherwise false
    */
-  def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: 
LogStartOffsetIncrementReason): Boolean = {
+  def maybeIncrementLogStartOffset(newLogStartOffset: Long,
+                                   reason: LogStartOffsetIncrementReason): 
Boolean = {
     // We don't have to write the log start offset to 
log-start-offset-checkpoint immediately.
     // The deleteRecordsOffset may be lost only if all in-sync replicas of 
this broker are shutdown
     // in an unclean manner within 
log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is 
low.
@@ -978,11 +1013,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           throw new OffsetOutOfRangeException(s"Cannot increment the log start 
offset to $newLogStartOffset of partition $topicPartition " +
             s"since it is larger than the high watermark $highWatermark")
 
+        if (remoteLogEnabled()) {
+          // This should be set log-start-offset is set more than the current 
local-log-start-offset
+          _localLogStartOffset = math.max(newLogStartOffset, 
localLogStartOffset())
+        }
+
         localLog.checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
           updatedLogStartOffset = true
           updateLogStartOffset(newLogStartOffset)
-          _localLogStartOffset = newLogStartOffset
           info(s"Incremented log start offset to $newLogStartOffset due to 
$reason")
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
@@ -1293,7 +1332,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           latestEpochAsOptional(leaderEpochCache)))
       } else {
         // We need to search the first segment whose largest timestamp is >= 
the target timestamp if there is one.
-        val remoteOffset = if (remoteLogEnabled()) {
+        if (remoteLogEnabled()) {
           if (remoteLogManager.isEmpty) {
             throw new KafkaException("RemoteLogManager is empty even though 
the remote log storage is enabled.")
           }
@@ -1301,25 +1340,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
             throw new KafkaException("Tiered storage is supported only with 
versions supporting leader epochs, that means RecordVersion must be >= 2.")
           }
 
-          remoteLogManager.get.findOffsetByTimestamp(topicPartition, 
targetTimestamp, logStartOffset, leaderEpochCache.get)
-        } else Optional.empty()
-
-        if (remoteOffset.isPresent) {
-          remoteOffset.asScala
+          val remoteOffset = 
remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, 
logStartOffset, leaderEpochCache.get)
+          if (remoteOffset.isPresent) {
+            remoteOffset.asScala
+          } else {
+            // If it is not found in remote log storage, search in the local 
log storage from local log start offset.
+            searchOffsetInLocalLog(targetTimestamp, localLogStartOffset())
+          }
         } else {
-          // If it is not found in remote storage, search in the local storage 
starting with local log start offset.
-
-          // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
-          // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-          val segmentsCopy = logSegments.toBuffer
-
-          val targetSeg = segmentsCopy.find(_.largestTimestamp >= 
targetTimestamp)
-          targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, 
_localLogStartOffset))
+          searchOffsetInLocalLog(targetTimestamp, logStartOffset)
         }
       }
     }
   }
 
+  private def searchOffsetInLocalLog(targetTimestamp: Long, startOffset: 
Long): Option[TimestampAndOffset] = {
+    // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+    // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+    val segmentsCopy = logSegments.toBuffer
+    val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp)
+    targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, startOffset))
+  }
+
   def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] 
= {
     // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
     // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
@@ -1391,7 +1433,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
                                 reason: SegmentDeletionReason): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+      val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+      // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
+      (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
+        // We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+        // offset can never exceed it.
+        highWatermark >= upperBoundOffset &&
         predicate(segment, nextSegmentOpt)
     }
     lock synchronized {
@@ -1403,6 +1451,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
+  private def incrementStartOffset(startOffset: Long, reason: 
LogStartOffsetIncrementReason): Unit = {
+    if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(startOffset, 
reason)
+    else maybeIncrementLogStartOffset(startOffset, reason)
+  }
+
   private def deleteSegments(deletable: Iterable[LogSegment], reason: 
SegmentDeletionReason): Int = {
     maybeHandleIOException(s"Error while deleting segments for $topicPartition 
in dir ${dir.getParent}") {
       val numToDelete = deletable.size
@@ -1420,7 +1473,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         // remove the segments for lookups
         localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
         deleteProducerSnapshots(deletable, asyncDelete = true)
-        
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
LogStartOffsetIncrementReason.SegmentDeletion)
+        incrementStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
LogStartOffsetIncrementReason.SegmentDeletion)
       }
       numToDelete
     }
@@ -1443,19 +1496,21 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
-    if (config.retentionMs < 0) return 0
+    val retentionMs = localRetentionMs(config, remoteLogEnabled())
+    if (retentionMs < 0) return 0
     val startMs = time.milliseconds
 
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      startMs - segment.largestTimestamp > config.retentionMs
+      startMs - segment.largestTimestamp > retentionMs
     }
 
-    deleteOldSegments(shouldDelete, RetentionMsBreach(this))
+    deleteOldSegments(shouldDelete, RetentionMsBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteRetentionSizeBreachedSegments(): Int = {
-    if (config.retentionSize < 0 || size < config.retentionSize) return 0
-    var diff = size - config.retentionSize
+    val retentionSize: Long = localRetentionSize(config, remoteLogEnabled())
+    if (retentionSize < 0 || size < retentionSize) return 0
+    var diff = size - retentionSize
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
       if (diff - segment.size >= 0) {
         diff -= segment.size
@@ -1465,15 +1520,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       }
     }
 
-    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
+    deleteOldSegments(shouldDelete, RetentionSizeBreach(this, 
remoteLogEnabled()))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
+      nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset))
     }
 
-    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
+    deleteOldSegments(shouldDelete, StartOffsetBreach(this, 
remoteLogEnabled()))
   }
 
   def isFuture: Boolean = localLog.isFuture
@@ -1483,6 +1538,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   def size: Long = localLog.segments.sizeInBytes
 
+  /**
+   * The log size in bytes for all segments that are only in local log but not 
yet in remote log.
+   */
+  def onlyLocalLogSegmentsSize: Long = 
UnifiedLog.sizeInBytes(logSegments.filter(_.baseOffset >= 
highestOffsetInRemoteStorage))
+
   /**
    * The offset of the next message that will be appended to the log
    */
@@ -2174,6 +2234,14 @@ object UnifiedLog extends Logging {
     }
   }
 
+  private[log] def localRetentionMs(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
+    if (remoteLogEnabled) config.remoteLogConfig.localRetentionMs else 
config.retentionMs
+  }
+
+  private[log] def localRetentionSize(config: LogConfig, remoteLogEnabled: 
Boolean): Long = {
+    if (remoteLogEnabled) config.remoteLogConfig.localRetentionBytes else 
config.retentionSize
+  }
+
 }
 
 object LogMetricNames {
@@ -2187,35 +2255,48 @@ object LogMetricNames {
   }
 }
 
-case class RetentionMsBreach(log: UnifiedLog) extends SegmentDeletionReason {
+case class RetentionMsBreach(log: UnifiedLog, remoteLogEnabled: Boolean) 
extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
-    val retentionMs = log.config.retentionMs
+    val retentionMs = UnifiedLog.localRetentionMs(log.config, remoteLogEnabled)
     toDelete.foreach { segment =>
       segment.largestRecordTimestamp match {
         case Some(_) =>
-          log.info(s"Deleting segment $segment due to retention time 
${retentionMs}ms breach based on the largest " +
-            s"record timestamp in the segment")
+          if (remoteLogEnabled)
+            log.info(s"Deleting segment $segment due to local log retention 
time ${retentionMs}ms breach based on the largest " +
+              s"record timestamp in the segment")
+          else
+            log.info(s"Deleting segment $segment due to log retention time 
${retentionMs}ms breach based on the largest " +
+              s"record timestamp in the segment")
         case None =>
-          log.info(s"Deleting segment $segment due to retention time 
${retentionMs}ms breach based on the " +
-            s"last modified time of the segment")
+          if (remoteLogEnabled)
+            log.info(s"Deleting segment $segment due to local log retention 
time ${retentionMs}ms breach based on the " +
+              s"last modified time of the segment")
+          else
+            log.info(s"Deleting segment $segment due to log retention time 
${retentionMs}ms breach based on the " +
+              s"last modified time of the segment")
       }
     }
   }
 }
 
-case class RetentionSizeBreach(log: UnifiedLog) extends SegmentDeletionReason {
+case class RetentionSizeBreach(log: UnifiedLog, remoteLogEnabled: Boolean) 
extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
     var size = log.size
     toDelete.foreach { segment =>
       size -= segment.size
-      log.info(s"Deleting segment $segment due to retention size 
${log.config.retentionSize} breach. Log size " +
+      if (remoteLogEnabled) log.info(s"Deleting segment $segment due to local 
log retention size ${UnifiedLog.localRetentionSize(log.config, 
remoteLogEnabled)} breach. " +
+        s"Local log size after deletion will be $size.")
+      else log.info(s"Deleting segment $segment due to log retention size 
${log.config.retentionSize} breach. Log size " +
         s"after deletion will be $size.")
     }
   }
 }
 
-case class StartOffsetBreach(log: UnifiedLog) extends SegmentDeletionReason {
+case class StartOffsetBreach(log: UnifiedLog, remoteLogEnabled: Boolean) 
extends SegmentDeletionReason {
   override def logReason(toDelete: List[LogSegment]): Unit = {
-    log.info(s"Deleting segments due to log start offset ${log.logStartOffset} 
breach: ${toDelete.mkString(",")}")
+    if (remoteLogEnabled)
+      log.info(s"Deleting segments due to local log start offset 
${log.localLogStartOffset()} breach: ${toDelete.mkString(",")}")
+    else
+      log.info(s"Deleting segments due to log start offset 
${log.logStartOffset} breach: ${toDelete.mkString(",")}")
   }
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 80155f65433..523959e037c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -575,7 +575,13 @@ class BrokerServer(
       }
 
       Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
-        (tp: TopicPartition) => logManager.getLog(tp).asJava, 
brokerTopicStats));
+        (tp: TopicPartition) => logManager.getLog(tp).asJava,
+        (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
+          logManager.getLog(tp).foreach { log =>
+            log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
+          }
+        },
+        brokerTopicStats))
     } else {
       None
     }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3b2ad089316..bcc12ef3978 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -614,7 +614,13 @@ class KafkaServer(
       }
 
       Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
-        (tp: TopicPartition) => logManager.getLog(tp).asJava, 
brokerTopicStats));
+        (tp: TopicPartition) => logManager.getLog(tp).asJava,
+        (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
+          logManager.getLog(tp).foreach { log =>
+            log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
+          }
+      },
+        brokerTopicStats));
     } else {
       None
     }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index f0b89aca855..588b31786c2 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -73,7 +73,6 @@ import scala.collection.JavaConverters;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -93,6 +92,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
@@ -127,39 +127,40 @@ import static 
org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class RemoteLogManagerTest {
-    Time time = new MockTime();
-    int brokerId = 0;
-    String logDir = TestUtils.tempDirectory("kafka-").toString();
-    String clusterId = "dummyId";
-    String remoteLogStorageTestProp = "remote.log.storage.test";
-    String remoteLogStorageTestVal = "storage.test";
-    String remoteLogMetadataTestProp = "remote.log.metadata.test";
-    String remoteLogMetadataTestVal = "metadata.test";
-    String remoteLogMetadataCommonClientTestProp = 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
-    String remoteLogMetadataCommonClientTestVal = "common.test";
-    String remoteLogMetadataProducerTestProp = 
REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
-    String remoteLogMetadataProducerTestVal = "producer.test";
-    String remoteLogMetadataConsumerTestProp = 
REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
-    String remoteLogMetadataConsumerTestVal = "consumer.test";
-    String remoteLogMetadataTopicPartitionsNum = "1";
-
-    RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
-    RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
-    RemoteLogManagerConfig remoteLogManagerConfig = null;
-
-    BrokerTopicStats brokerTopicStats = null;
-    RemoteLogManager remoteLogManager = null;
-
-    TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
-    TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
-    Map<String, Uuid> topicIds = new HashMap<>();
-    TopicPartition tp = new TopicPartition("TestTopic", 5);
-    EpochEntry epochEntry0 = new EpochEntry(0, 0);
-    EpochEntry epochEntry1 = new EpochEntry(1, 100);
-    EpochEntry epochEntry2 = new EpochEntry(2, 200);
-    List<EpochEntry> totalEpochEntries = Arrays.asList(epochEntry0, 
epochEntry1, epochEntry2);
-    LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint() {
+    private final Time time = new MockTime();
+    private final int brokerId = 0;
+    private final String logDir = TestUtils.tempDirectory("kafka-").toString();
+    private final String clusterId = "dummyId";
+    private final String remoteLogStorageTestProp = "remote.log.storage.test";
+    private final String remoteLogStorageTestVal = "storage.test";
+    private final String remoteLogMetadataTestProp = 
"remote.log.metadata.test";
+    private final String remoteLogMetadataTestVal = "metadata.test";
+    private final String remoteLogMetadataCommonClientTestProp = 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
+    private final String remoteLogMetadataCommonClientTestVal = "common.test";
+    private final String remoteLogMetadataProducerTestProp = 
REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
+    private final String remoteLogMetadataProducerTestVal = "producer.test";
+    private final String remoteLogMetadataConsumerTestProp = 
REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
+    private final String remoteLogMetadataConsumerTestVal = "consumer.test";
+    private final String remoteLogMetadataTopicPartitionsNum = "1";
+
+    private final RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
+    private final RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
+    private RemoteLogManagerConfig remoteLogManagerConfig = null;
+
+    private BrokerTopicStats brokerTopicStats = null;
+    private RemoteLogManager remoteLogManager = null;
+
+    private final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
+    private final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
+    private final Map<String, Uuid> topicIds = new HashMap<>();
+    private final TopicPartition tp = new TopicPartition("TestTopic", 5);
+    private final EpochEntry epochEntry0 = new EpochEntry(0, 0);
+    private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
+    private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
+    private final List<EpochEntry> totalEpochEntries = 
Arrays.asList(epochEntry0, epochEntry1, epochEntry2);
+    private final LeaderEpochCheckpoint checkpoint = new 
LeaderEpochCheckpoint() {
         List<EpochEntry> epochs = Collections.emptyList();
+
         @Override
         public void write(Collection<EpochEntry> epochs) {
             this.epochs = new ArrayList<>(epochs);
@@ -171,7 +172,7 @@ public class RemoteLogManagerTest {
         }
     };
 
-    UnifiedLog mockLog = mock(UnifiedLog.class);
+    private final UnifiedLog mockLog = mock(UnifiedLog.class);
 
     @BeforeEach
     void setUp() throws Exception {
@@ -183,8 +184,10 @@ public class RemoteLogManagerTest {
         brokerTopicStats = new 
BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
 
         kafka.utils.TestUtils.clearYammerMetrics();
-
-        remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, 
brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), 
brokerTopicStats) {
+        remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, 
brokerId, logDir, clusterId, time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
             public RemoteStorageManager createRemoteStorageManager() {
                 return remoteStorageManager;
             }
@@ -273,7 +276,15 @@ public class RemoteLogManagerTest {
         Properties props = new Properties();
         // override common security.protocol by adding "RLMM prefix" and 
"remote log metadata common client prefix"
         props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
-        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp 
-> Optional.of(mockLog), brokerTopicStats) {
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                createRLMConfig(props),
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
             public RemoteStorageManager createRemoteStorageManager() {
                 return remoteStorageManager;
             }
@@ -790,7 +801,10 @@ public class RemoteLogManagerTest {
     void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
         ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
         try (RemoteLogManager remoteLogManager =
-            new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, 
clusterId, time, t -> Optional.empty(), brokerTopicStats) {
+            new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, 
clusterId, time,
+                    t -> Optional.empty(),
+                    (topicPartition, offset) -> { },
+                    brokerTopicStats) {
                 public RemoteStorageManager createRemoteStorageManager() {
                     return rsmManager;
                 }
@@ -887,77 +901,122 @@ public class RemoteLogManagerTest {
     @Test
     void testFindOffsetByTimestamp() throws IOException, 
RemoteStorageException {
         TopicPartition tp = leaderTopicIdPartition.topicPartition();
-        RemoteLogSegmentId remoteLogSegmentId = new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+
+        long ts = time.milliseconds();
+        long startOffset = 120;
+        int targetLeaderEpoch = 10;
+
+        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+        LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
+        leaderEpochFileCache.assign(4, 99L);
+        leaderEpochFileCache.assign(5, 99L);
+        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+        leaderEpochFileCache.assign(12, 500L);
+
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs);
+
+        // Fetching message for timestamp `ts` will return the message with 
startOffset+1, and `ts+1` as there are no
+        // messages starting with the startOffset and with `ts`.
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 1, 
startOffset + 1, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset1);
+
+        // Fetching message for `ts+2` will return the message with 
startOffset+2 and its timestamp value is `ts+2`.
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset2 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 2, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 2, 
startOffset + 2, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset2);
+
+        // Fetching message for `ts+3` will return None as there are no 
records with timestamp >= ts+3.
+        Optional<FileRecords.TimestampAndOffset>  maybeTimestampAndOffset3 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 3, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.empty(), maybeTimestampAndOffset3);
+    }
+
+    @Test
+    void testFindOffsetByTimestampWithInvalidEpochSegments() throws 
IOException, RemoteStorageException {
+        TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
         long ts = time.milliseconds();
         long startOffset = 120;
         int targetLeaderEpoch = 10;
 
+        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+        validSegmentEpochs.put(targetLeaderEpoch - 1, startOffset - 1); // 
invalid epochs not aligning with leader epoch cache
+        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+        LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
+        leaderEpochFileCache.assign(4, 99L);
+        leaderEpochFileCache.assign(5, 99L);
+        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+        leaderEpochFileCache.assign(12, 500L);
+
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs);
+
+        // Fetch offsets for this segment returns empty as the segment epochs 
are not with in the leader epoch cache.
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.empty(), maybeTimestampAndOffset1);
+
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset2 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 2, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.empty(), maybeTimestampAndOffset2);
+
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset3 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 3, startOffset, 
leaderEpochFileCache);
+        assertEquals(Optional.empty(), maybeTimestampAndOffset3);
+    }
+
+    private void doTestFindOffsetByTimestamp(long ts, long startOffset, int 
targetLeaderEpoch,
+                                             TreeMap<Integer, Long> 
validSegmentEpochs) throws IOException, RemoteStorageException {
+        TopicPartition tp = leaderTopicIdPartition.topicPartition();
+        RemoteLogSegmentId remoteLogSegmentId = new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+
         RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
         
when(segmentMetadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId);
         when(segmentMetadata.maxTimestampMs()).thenReturn(ts + 2);
         when(segmentMetadata.startOffset()).thenReturn(startOffset);
         when(segmentMetadata.endOffset()).thenReturn(startOffset + 2);
+        
when(segmentMetadata.segmentLeaderEpochs()).thenReturn(validSegmentEpochs);
 
         File tpDir = new File(logDir, tp.toString());
         Files.createDirectory(tpDir.toPath());
         File txnIdxFile = new File(tpDir, "txn-index" + 
UnifiedLog.TxnIndexFileSuffix());
         txnIdxFile.createNewFile();
         
when(remoteStorageManager.fetchIndex(any(RemoteLogSegmentMetadata.class), 
any(IndexType.class)))
-            .thenAnswer(ans -> {
-                RemoteLogSegmentMetadata metadata = 
ans.<RemoteLogSegmentMetadata>getArgument(0);
-                IndexType indexType = ans.<IndexType>getArgument(1);
-                int maxEntries = (int) (metadata.endOffset() - 
metadata.startOffset());
-                OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, 
String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix()),
-                    metadata.startOffset(), maxEntries * 8);
-                TimeIndex timeIdx = new TimeIndex(new File(tpDir, 
String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix()),
-                    metadata.startOffset(), maxEntries * 12);
-                switch (indexType) {
-                    case OFFSET:
-                        return new FileInputStream(offsetIdx.file());
-                    case TIMESTAMP:
-                        return new FileInputStream(timeIdx.file());
-                    case TRANSACTION:
-                        return new FileInputStream(txnIdxFile);
-                }
-                return null;
-            });
+                .thenAnswer(ans -> {
+                    RemoteLogSegmentMetadata metadata = 
ans.<RemoteLogSegmentMetadata>getArgument(0);
+                    IndexType indexType = ans.<IndexType>getArgument(1);
+                    int maxEntries = (int) (metadata.endOffset() - 
metadata.startOffset());
+                    OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, 
String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix()),
+                            metadata.startOffset(), maxEntries * 8);
+                    TimeIndex timeIdx = new TimeIndex(new File(tpDir, 
String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix()),
+                            metadata.startOffset(), maxEntries * 12);
+                    switch (indexType) {
+                        case OFFSET:
+                            return 
Files.newInputStream(offsetIdx.file().toPath());
+                        case TIMESTAMP:
+                            return 
Files.newInputStream(timeIdx.file().toPath());
+                        case TRANSACTION:
+                            return Files.newInputStream(txnIdxFile.toPath());
+                    }
+                    return null;
+                });
 
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), 
anyInt()))
-            .thenAnswer(ans -> {
-                int leaderEpoch = ans.<Integer>getArgument(1);
-                if (leaderEpoch == targetLeaderEpoch)
-                    return Collections.singleton(segmentMetadata).iterator();
-                else
-                    return Collections.emptyList().iterator();
-            });
-
-
+                .thenAnswer(ans -> {
+                    int leaderEpoch = ans.<Integer>getArgument(1);
+                    if (leaderEpoch == targetLeaderEpoch)
+                        return 
Collections.singleton(segmentMetadata).iterator();
+                    else
+                        return Collections.emptyIterator();
+                });
 
         // 3 messages are added with offset, and timestamp as below
         // startOffset   , ts-1
         // startOffset+1 , ts+1
         // startOffset+2 , ts+2
         when(remoteStorageManager.fetchLogSegment(segmentMetadata, 0))
-            .thenAnswer(a -> new ByteArrayInputStream(records(ts, startOffset, 
targetLeaderEpoch).buffer().array()));
+                .thenAnswer(a -> new ByteArrayInputStream(records(ts, 
startOffset, targetLeaderEpoch).buffer().array()));
 
-        LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
-        leaderEpochFileCache.assign(5, 99L);
-        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
-        leaderEpochFileCache.assign(12, 500L);
+        when(mockLog.logEndOffset()).thenReturn(600L);
 
         
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
 Collections.emptySet(), topicIds);
-        // Fetching message for timestamp `ts` will return the message with 
startOffset+1, and `ts+1` as there are no
-        // messages starting with the startOffset and with `ts`.
-        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);
-        assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 1, 
startOffset + 1, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset1);
-
-        // Fetching message for `ts+2` will return the message with 
startOffset+2 and its timestamp value is `ts+2`.
-        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset2 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 2, startOffset, 
leaderEpochFileCache);
-        assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 2, 
startOffset + 2, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset2);
-
-        // Fetching message for `ts+3` will return None as there are no 
records with timestamp >= ts+3.
-        Optional<FileRecords.TimestampAndOffset>  maybeTimestampAndOffset3 = 
remoteLogManager.findOffsetByTimestamp(tp, ts + 3, startOffset, 
leaderEpochFileCache);
-        assertEquals(Optional.empty(), maybeTimestampAndOffset3);
     }
 
     @Test
@@ -974,7 +1033,7 @@ public class RemoteLogManagerTest {
         MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = 
mockConstruction(KafkaMetricsGroup.class);
         try {
             RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
-                time, tp -> Optional.of(mockLog), brokerTopicStats) {
+                time, tp -> Optional.of(mockLog), (topicPartition, offset) -> 
{ }, brokerTopicStats) {
                 public RemoteStorageManager createRemoteStorageManager() {
                     return remoteStorageManager;
                 }
@@ -1007,6 +1066,155 @@ public class RemoteLogManagerTest {
         }
     }
 
+    private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map<Integer, 
Long> segmentEpochs) {
+        return new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+                        new TopicPartition("topic", 0)), Uuid.randomUuid()),
+                startOffset, endOffset,
+                100000L,
+                1,
+                100000L,
+                1000,
+                Optional.empty(),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+    }
+
+    @Test
+    public void testBuildFilteredLeaderEpochMap() {
+        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<>();
+        leaderEpochToStartOffset.put(0, 0L);
+        leaderEpochToStartOffset.put(1, 0L);
+        leaderEpochToStartOffset.put(2, 0L);
+        leaderEpochToStartOffset.put(3, 30L);
+        leaderEpochToStartOffset.put(4, 40L);
+        leaderEpochToStartOffset.put(5, 60L);
+        leaderEpochToStartOffset.put(6, 60L);
+        leaderEpochToStartOffset.put(7, 70L);
+        leaderEpochToStartOffset.put(8, 70L);
+
+        TreeMap<Integer, Long> expectedLeaderEpochs = new TreeMap<>();
+        expectedLeaderEpochs.put(2, 0L);
+        expectedLeaderEpochs.put(3, 30L);
+        expectedLeaderEpochs.put(4, 40L);
+        expectedLeaderEpochs.put(6, 60L);
+        expectedLeaderEpochs.put(8, 70L);
+
+        NavigableMap<Integer, Long> refinedLeaderEpochMap = 
RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
+        assertEquals(expectedLeaderEpochs, refinedLeaderEpochMap);
+    }
+
+    @Test
+    public void testRemoteSegmentWithinLeaderEpochs() {
+        // Test whether a remote segment is within the leader epochs
+        final long logEndOffset = 90L;
+
+        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<>();
+        leaderEpochToStartOffset.put(0, 0L);
+        leaderEpochToStartOffset.put(1, 10L);
+        leaderEpochToStartOffset.put(2, 20L);
+        leaderEpochToStartOffset.put(3, 30L);
+        leaderEpochToStartOffset.put(4, 40L);
+        leaderEpochToStartOffset.put(5, 50L);
+        leaderEpochToStartOffset.put(7, 70L);
+
+        // Test whether a remote segment's epochs/offsets(multiple) are within 
the range of leader epochs
+        TreeMap<Integer, Long> segmentEpochs1 = new TreeMap<>();
+        segmentEpochs1.put(1, 15L);
+        segmentEpochs1.put(2, 20L);
+        segmentEpochs1.put(3, 30L);
+
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                35,
+                segmentEpochs1), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a remote segment's epochs/offsets(single) are within 
the range of leader epochs
+        TreeMap<Integer, Long> segmentEpochs2 = new TreeMap<>();
+        segmentEpochs2.put(1, 15L);
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                19,
+                segmentEpochs2), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a remote segment's start offset is same as the offset 
of the respective leader epoch entry.
+        TreeMap<Integer, Long> segmentEpochs3 = new TreeMap<>();
+        segmentEpochs3.put(0, 0L); // same as leader epoch's start offset
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                0,
+                5,
+                segmentEpochs3), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a remote segment's start offset is same as the offset 
of the respective leader epoch entry.
+        TreeMap<Integer, Long> segmentEpochs4 = new TreeMap<>();
+        segmentEpochs4.put(7, 70L); // same as leader epoch's start offset
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                70,
+                75,
+                segmentEpochs4), logEndOffset, leaderEpochToStartOffset));
+
+
+        // Test whether a remote segment's end offset is same as the end 
offset of the respective leader epoch entry.
+        TreeMap<Integer, Long> segmentEpochs5 = new TreeMap<>();
+        segmentEpochs5.put(1, 15L);
+        segmentEpochs5.put(2, 20L);
+
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                29, // same as end offset for epoch 2 in 
leaderEpochToStartOffset
+                segmentEpochs5), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether any of the epoch's is not with in the leader epoch 
chain.
+        TreeMap<Integer, Long> segmentEpochs6 = new TreeMap<Integer, Long>();
+        segmentEpochs6.put(5, 55L);
+        segmentEpochs6.put(6, 60L); // epoch 6 exists here but it is missing 
in leaderEpochToStartOffset
+        segmentEpochs6.put(7, 70L);
+
+        
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                55,
+                85,
+                segmentEpochs6), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether an epoch existing in remote segment does not exist in 
leader epoch chain.
+        TreeMap<Integer, Long> segmentEpochs7 = new TreeMap<>();
+        segmentEpochs7.put(1, 15L);
+        segmentEpochs7.put(2, 20L); // epoch 3 is missing here which exists in 
leaderEpochToStartOffset
+        segmentEpochs7.put(4, 40L);
+
+        
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                45,
+                segmentEpochs7), logEndOffset, leaderEpochToStartOffset));
+
+        // Test a remote segment having larger end offset than the log end 
offset
+        TreeMap<Integer, Long> segmentEpochs8 = new TreeMap<>();
+        segmentEpochs8.put(1, 15L);
+        segmentEpochs8.put(2, 20L);
+
+        
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                95, // larger than log end offset
+                segmentEpochs8), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a segment's first offset is earlier to the respective 
epoch's start offset
+        TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<>();
+        segmentEpochs9.put(1, 5L);
+        segmentEpochs9.put(2, 20L);
+
+        
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                5, // earlier to epoch 1's start offset
+                25,
+                segmentEpochs9), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a segment's last offset is more than the respective 
epoch's end offset
+        TreeMap<Integer, Long> segmentEpochs10 = new TreeMap<>();
+        segmentEpochs10.put(1, 15L);
+        segmentEpochs10.put(2, 20L);
+        
assertFalse(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                35, // more than epoch 2's end offset
+                segmentEpochs10), logEndOffset, leaderEpochToStartOffset));
+    }
+
     @Test
     public void testCandidateLogSegmentsSkipsActiveSegment() {
         UnifiedLog log = mock(UnifiedLog.class);
@@ -1057,6 +1265,35 @@ public class RemoteLogManagerTest {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testRemoteSizeData() {
+        Supplier<RemoteLogManager.RetentionSizeData>[] 
invalidRetentionSizeData =
+            new Supplier[]{
+                () -> new RemoteLogManager.RetentionSizeData(10, 0),
+                () -> new RemoteLogManager.RetentionSizeData(10, -1),
+                () -> new RemoteLogManager.RetentionSizeData(-1, 10),
+                () -> new RemoteLogManager.RetentionSizeData(-1, -1),
+                () -> new RemoteLogManager.RetentionSizeData(-1, 0)
+            };
+
+        for (Supplier<RemoteLogManager.RetentionSizeData> 
invalidRetentionSizeDataEntry : invalidRetentionSizeData) {
+            assertThrows(IllegalArgumentException.class, 
invalidRetentionSizeDataEntry::get);
+        }
+    }
+
+    @Test
+    public void testRemoteSizeTime() {
+        Supplier<RemoteLogManager.RetentionTimeData>[] 
invalidRetentionTimeData =
+            new Supplier[] {
+                () -> new RemoteLogManager.RetentionTimeData(-1, 10),
+                () -> new RemoteLogManager.RetentionTimeData(10, -1),
+            };
+
+        for (Supplier<RemoteLogManager.RetentionTimeData> 
invalidRetentionTimeDataEntry : invalidRetentionTimeData) {
+            assertThrows(IllegalArgumentException.class, 
invalidRetentionTimeDataEntry::get);
+        }
+    }
+
     @Test
     public void testStopPartitionsWithoutDeletion() throws 
RemoteStorageException {
         BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, 
throwable) -> fail("shouldn't be called");
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 8c98bfb9278..0104c55e4f2 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3585,9 +3585,25 @@ class UnifiedLogTest {
       log.updateHighWatermark(90L)
       log.maybeIncrementLogStartOffset(20L, 
LogStartOffsetIncrementReason.SegmentDeletion)
       assertEquals(20, log.logStartOffset)
-      assertEquals(log.logStartOffset, log.localLogStartOffset())
     }
 
+  @Test
+  def testStartOffsetsRemoteLogStorageIsEnabled(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    for (i <- 0 until 100) {
+      val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
+      log.appendAsLeader(records, leaderEpoch = 0)
+    }
+
+    log.updateHighWatermark(80L)
+    val newLogStartOffset = 40L;
+    log.maybeIncrementLogStartOffset(newLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+    assertEquals(newLogStartOffset, log.logStartOffset)
+    assertEquals(log.logStartOffset, log.localLogStartOffset())
+  }
+
   private class MockLogOffsetsListener extends LogOffsetsListener {
     private var highWatermark: Long = -1L
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 868f3c76ece..d9cd8cb0ab7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3631,6 +3631,7 @@ class ReplicaManagerTest {
       "clusterId",
       time,
       _ => Optional.of(mockLog),
+      (TopicPartition, Long) => {},
       brokerTopicStats)
     val spyRLM = spy(remoteLogManager)
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 55d7be4029a..4cb77449579 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.TreeMap;
@@ -398,7 +399,6 @@ public class LeaderEpochFileCache {
         }
     }
 
-    // Visible for testing
     public List<EpochEntry> epochEntries() {
         lock.readLock().lock();
         try {
@@ -408,6 +408,19 @@ public class LeaderEpochFileCache {
         }
     }
 
+    public NavigableMap<Integer, Long> epochWithOffsets() {
+        lock.readLock().lock();
+        try {
+            NavigableMap<Integer, Long> epochWithOffsets = new TreeMap<>();
+            for (EpochEntry epochEntry : epochs.values()) {
+                epochWithOffsets.put(epochEntry.epoch, epochEntry.startOffset);
+            }
+            return epochWithOffsets;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
         lock.readLock().lock();
         try {
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index f4b402b8e3b..e177dfcfcb7 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -102,9 +102,9 @@ public class LogConfig extends AbstractConfig {
 
     public static class RemoteLogConfig {
 
-        private final boolean remoteStorageEnable;
-        private final long localRetentionMs;
-        private final long localRetentionBytes;
+        public final boolean remoteStorageEnable;
+        public final long localRetentionMs;
+        public final long localRetentionBytes;
 
         private RemoteLogConfig(LogConfig config) {
             this.remoteStorageEnable = 
config.getBoolean(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);

Reply via email to