This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34d56dc  KAFKA-12802 Added a file based cache for consumed remote log 
metadata for each partition to avoid consuming again incase of broker restarts. 
(#11058)
34d56dc is described below

commit 34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd
Author: Satish Duggana <sati...@apache.org>
AuthorDate: Mon Oct 11 22:54:55 2021 +0530

    KAFKA-12802 Added a file based cache for consumed remote log metadata for 
each partition to avoid consuming again incase of broker restarts. (#11058)
    
    Added snapshots for consumed remote log metadata for each partition to 
avoid consuming again in case of broker restarts. These snapshots are stored in 
the respective topic partition log directories.
    
    Reviewers: Kowshik Prakasam <kpraka...@confluent.io>, Cong Ding 
<c...@ccding.com>, Jun Rao <jun...@gmail.com>
---
 checkstyle/import-control.xml                      |   5 +
 .../remote/storage/RemoteLogMetadataManager.java   |   2 +-
 .../remote/storage/RemoteLogSegmentMetadata.java   |  18 +-
 .../log/remote/storage/RemoteLogSegmentState.java  |   2 +-
 .../metadata/storage/CommittedOffsetsFile.java     |  86 +++++++
 .../remote/metadata/storage/ConsumerManager.java   |  10 +-
 .../log/remote/metadata/storage/ConsumerTask.java  | 157 ++++++++++--
 .../storage/FileBasedRemoteLogMetadataCache.java   | 109 +++++++++
 .../storage/RemoteLogLeaderEpochState.java         |  24 +-
 .../metadata/storage/RemoteLogMetadataCache.java   |  74 +++---
 .../storage/RemoteLogMetadataSnapshotFile.java     | 267 +++++++++++++++++++++
 .../storage/RemoteLogSegmentMetadataSnapshot.java  | 209 ++++++++++++++++
 .../RemotePartitionMetadataEventHandler.java       |  10 +
 .../storage/RemotePartitionMetadataStore.java      |  63 ++++-
 .../TopicBasedRemoteLogMetadataManager.java        |  48 +++-
 .../TopicBasedRemoteLogMetadataManagerConfig.java  |  11 +
 .../serialization/RemoteLogMetadataSerde.java      |   5 +
 .../RemoteLogSegmentMetadataSnapshotTransform.java |  73 ++++++
 .../RemoteLogSegmentMetadataSnapshotRecord.json    |  92 +++++++
 .../FileBasedRemoteLogMetadataCacheTest.java       |  86 +++++++
 .../storage/RemoteLogMetadataSnapshotFileTest.java |  82 +++++++
 .../storage/RemoteLogSegmentLifecycleTest.java     |  16 +-
 ...picBasedRemoteLogMetadataManagerConfigTest.java |   6 +-
 .../TopicBasedRemoteLogMetadataManagerHarness.java |  48 +++-
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 183 ++++++++++++++
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  56 +++--
 ...RemoteLogMetadataManagerWrapperWithHarness.java |  25 +-
 .../storage/RemoteLogMetadataManagerTest.java      |   1 -
 28 files changed, 1626 insertions(+), 142 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ba3087b..89d3484 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -305,6 +305,11 @@
       <allow pkg="org.apache.kafka.server.common" />
       <allow pkg="org.apache.kafka.server.log" />
       <allow pkg="org.apache.kafka.test" />
+
+      <subpackage name="remote">
+        <allow pkg="scala.collection" />
+      </subpackage>
+
     </subpackage>
   </subpackage>
 
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 1bf04b3..9a29746 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -73,7 +73,7 @@ public interface RemoteLogMetadataManager extends 
Configurable, Closeable {
      * <p>
      * <pre>
      * +---------------------+            +----------------------+
-     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
+     * |COPY_SEGMENT_STARTED |-----------&gt;|COPY_SEGMENT_FINISHED |
      * +-------------------+-+            +--+-------------------+
      *                     |                 |
      *                     |                 |
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 2290fd9..e0cbb79 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -87,15 +87,15 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      * @param state               State of the respective segment of 
remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
-    private RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
-                                     long startOffset,
-                                     long endOffset,
-                                     long maxTimestampMs,
-                                     int brokerId,
-                                     long eventTimestampMs,
-                                     int segmentSizeInBytes,
-                                     RemoteLogSegmentState state,
-                                     Map<Integer, Long> segmentLeaderEpochs) {
+    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+                                    long startOffset,
+                                    long endOffset,
+                                    long maxTimestampMs,
+                                    int brokerId,
+                                    long eventTimestampMs,
+                                    int segmentSizeInBytes,
+                                    RemoteLogSegmentState state,
+                                    Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, 
"remoteLogSegmentId can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
index bf0befe..c618321 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
  * <p>
  * <pre>
  * +---------------------+            +----------------------+
- * |COPY_SEGMENT_STARTED |----------> |COPY_SEGMENT_FINISHED |
+ * |COPY_SEGMENT_STARTED |----------&gt; |COPY_SEGMENT_FINISHED |
  * +-------------------+-+            +--+-------------------+
  *                     |                 |
  *                     |                 |
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
new file mode 100644
index 0000000..1eddc0b
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class represents a file containing the committed offsets of remote log 
metadata partitions.
+ */
+public class CommittedOffsetsFile {
+    private static final int CURRENT_VERSION = 0;
+    private static final String SEPARATOR = " ";
+
+    private static final Pattern MINIMUM_ONE_WHITESPACE = 
Pattern.compile("\\s+");
+    private final CheckpointFile<Map.Entry<Integer, Long>> checkpointFile;
+
+    CommittedOffsetsFile(File offsetsFile) throws IOException {
+        CheckpointFile.EntryFormatter<Map.Entry<Integer, Long>> formatter = 
new EntryFormatter();
+        checkpointFile = new CheckpointFile<>(offsetsFile, CURRENT_VERSION, 
formatter);
+    }
+
+    private static class EntryFormatter implements 
CheckpointFile.EntryFormatter<Map.Entry<Integer, Long>> {
+
+        @Override
+        public String toString(Map.Entry<Integer, Long> entry) {
+            // Each entry is stored in a new line as <partition-num offset>
+            return entry.getKey() + SEPARATOR + entry.getValue();
+        }
+
+        @Override
+        public Optional<Map.Entry<Integer, Long>> fromString(String line) {
+            String[] strings = MINIMUM_ONE_WHITESPACE.split(line);
+            if (strings.length != 2) {
+                return Optional.empty();
+            }
+
+            try {
+                return Optional.of(Utils.mkEntry(Integer.parseInt(strings[0]), 
Long.parseLong(strings[1])));
+            } catch (NumberFormatException e) {
+                return Optional.empty();
+            }
+
+        }
+    }
+
+    public synchronized void writeEntries(Map<Integer, Long> committedOffsets) 
throws IOException {
+        checkpointFile.write(committedOffsets.entrySet());
+    }
+
+    public synchronized Map<Integer, Long> readEntries() throws IOException {
+        List<Map.Entry<Integer, Long>> entries = checkpointFile.read();
+        Map<Integer, Long> partitionToOffsets = new HashMap<>(entries.size());
+        for (Map.Entry<Integer, Long> entry : entries) {
+            Long existingValue = partitionToOffsets.put(entry.getKey(), 
entry.getValue());
+            if (existingValue != null) {
+                throw new IOException("Multiple entries exist for key: " + 
entry.getKey());
+            }
+        }
+
+        return partitionToOffsets;
+    }
+}
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 4f29c87..77f83fb 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -27,7 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
@@ -39,6 +41,8 @@ import java.util.concurrent.TimeoutException;
  */
 public class ConsumerManager implements Closeable {
 
+    public static final String COMMITTED_OFFSETS_FILE_NAME = 
"_rlmm_committed_offsets";
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerManager.class);
     private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
 
@@ -49,14 +53,15 @@ public class ConsumerManager implements Closeable {
 
     public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
                            RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-                           RemoteLogMetadataTopicPartitioner 
rlmmTopicPartitioner,
+                           RemoteLogMetadataTopicPartitioner topicPartitioner,
                            Time time) {
         this.rlmmConfig = rlmmConfig;
         this.time = time;
 
         //Create a task to consume messages and submit the respective events 
to RemotePartitionMetadataEventHandler.
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(rlmmConfig.consumerProperties());
-        consumerTask = new ConsumerTask(consumer, 
remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+        Path committedOffsetsPath = new File(rlmmConfig.logDir(), 
COMMITTED_OFFSETS_FILE_NAME).toPath();
+        consumerTask = new ConsumerTask(consumer, 
remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, 
time, 60_000L);
         consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", 
consumerTask);
     }
 
@@ -64,6 +69,7 @@ public class ConsumerManager implements Closeable {
         try {
             // Start a thread to continuously consume records from topic 
partitions.
             consumerTaskThread.start();
+            log.info("RLMM Consumer task thread is started");
         } catch (Exception e) {
             throw new KafkaException("Error encountered while initializing and 
scheduling ConsumerTask thread", e);
         }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 7691c6f..2509a44 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -29,8 +30,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
@@ -66,14 +70,17 @@ class ConsumerTask implements Runnable, Closeable {
     private final KafkaConsumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final Time time;
 
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages and it will not allow partition 
assignments to be updated.
     private volatile boolean closing = false;
+
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
     private volatile boolean assignPartitions = false;
 
+    // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
@@ -82,44 +89,143 @@ class ConsumerTask implements Runnable, Closeable {
     // User topic partitions that this broker is a leader/follower for.
     private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets.
+    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
+    // may or may not have been processed based on the assigned topic 
partitions.
     private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
+    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+    private final long committedOffsetSyncIntervalMs;
+    private CommittedOffsetsFile committedOffsetsFile;
+    private long lastSyncedTimeMs;
+
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
                         RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(remotePartitionMetadataEventHandler);
-        Objects.requireNonNull(topicPartitioner);
-
-        this.consumer = consumer;
-        this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-        this.topicPartitioner = topicPartitioner;
+                        RemoteLogMetadataTopicPartitioner topicPartitioner,
+                        Path committedOffsetsPath,
+                        Time time,
+                        long committedOffsetSyncIntervalMs) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+        this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.time = Objects.requireNonNull(time);
+        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+        initializeConsumerAssignment(committedOffsetsPath);
+    }
+
+    private void initializeConsumerAssignment(Path committedOffsetsPath) {
+        try {
+            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        Map<Integer, Long> committedOffsets = Collections.emptyMap();
+        try {
+            // Load committed offset and assign them in the consumer.
+            committedOffsets = committedOffsetsFile.readEntries();
+        } catch (IOException e) {
+            // Ignore the error and consumer consumes from the earliest offset.
+            log.error("Encountered error while building committed offsets from 
the file. " +
+                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
+        }
+
+        if (!committedOffsets.isEmpty()) {
+            // Assign topic partitions from the earlier committed offsets file.
+            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
+            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+                                                                               
    .collect(Collectors.toSet());
+            consumer.assign(metadataTopicPartitions);
+
+            // Seek to the committed offsets
+            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
+                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
+                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
+            }
+
+            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
+        }
     }
 
     @Override
     public void run() {
         log.info("Started Consumer task thread.");
+        lastSyncedTimeMs = time.milliseconds();
         try {
             while (!closing) {
                 maybeWaitForPartitionsAssignment();
 
                 log.info("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords
-                        = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    handleRemoteLogMetadata(serde.deserialize(record.value()));
-                    partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+                    processConsumerRecord(record);
                 }
+
+                maybeSyncCommittedDataAndOffsets(false);
             }
         } catch (Exception e) {
             log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
         } finally {
+            maybeSyncCommittedDataAndOffsets(true);
             closeConsumer();
             log.info("Exiting from consumer task thread");
         }
     }
 
+    private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
+        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
+        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
+        // partitions.
+        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        synchronized (assignPartitionsLock) {
+            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
+                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            } else {
+                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
+            }
+            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        }
+    }
+
+    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+        // Return immediately if there is no consumption from last time.
+        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
+        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+            return;
+        }
+
+        try {
+            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
+            // get updated by other threads.
+            synchronized (assignPartitionsLock) {
+                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
+                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
+                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
+                    if (offset != null) {
+                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
+                    } else {
+                        log.debug("Skipping syncup of the 
remote-log-metadata-file for partition:{} , with remote log metadata 
partition{}, and no offset",
+                                topicIdPartition, metadataPartition);
+                    }
+                }
+
+                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
+                // in case of restarts.
+                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
+                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
+            }
+
+            lastSyncedTimeMs = time.milliseconds();
+        } catch (IOException e) {
+            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+        }
+    }
+
     private void closeConsumer() {
         log.info("Closing the consumer instance");
         try {
@@ -158,6 +264,9 @@ class ConsumerTask implements Runnable, Closeable {
 
             if (assignPartitions) {
                 assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
+                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
+                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
+
                 assignPartitions = false;
             }
         }
@@ -167,18 +276,11 @@ class ConsumerTask implements Runnable, Closeable {
         }
     }
 
-    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
-        if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-        } else {
-            log.debug("This event {} is skipped as the topic partition is not 
assigned for this instance.", remoteLogMetadata);
-        }
-    }
-
     private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions = 
assignedMetaPartitionsSnapshot.stream()
-                .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                .collect(Collectors.toSet());
+        Set<TopicPartition> assignedMetaTopicPartitions =
+                assignedMetaPartitionsSnapshot.stream()
+                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                                              .collect(Collectors.toSet());
         log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
         consumer.assign(assignedMetaTopicPartitions);
     }
@@ -210,12 +312,19 @@ class ConsumerTask implements Runnable, Closeable {
             for (TopicIdPartition tp : updatedReassignedPartitions) {
                 
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
             }
+
+            // Clear removed topic partitions from inmemory cache.
+            for (TopicIdPartition removedPartition : removedPartitions) {
+                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
+            }
+
             assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
             log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
 
             if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{
                 assignedMetaPartitions = 
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
                 log.debug("Assigned metadata topic partitions: {}", 
assignedMetaPartitions);
+
                 assignPartitions = true;
                 assignPartitionsLock.notifyAll();
             } else {
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
new file mode 100644
index 0000000..15e4562
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is a wrapper around {@link RemoteLogMetadataCache} providing a file 
based snapshot of
+ * {@link RemoteLogMetadataCache} for the given {@code topicIdPartition}. 
Snapshot is stored in the given
+ * {@code partitionDir}.
+ */
+public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache {
+    private static final Logger log = 
LoggerFactory.getLogger(FileBasedRemoteLogMetadataCache.class);
+    private final RemoteLogMetadataSnapshotFile snapshotFile;
+    private final TopicIdPartition topicIdPartition;
+
+    public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition,
+                                           Path partitionDir) {
+        if (!partitionDir.toFile().exists() || 
!partitionDir.toFile().isDirectory()) {
+            throw new KafkaException("Given partition directory:" + 
partitionDir + " must be an existing directory.");
+        }
+
+        this.topicIdPartition = topicIdPartition;
+        snapshotFile = new RemoteLogMetadataSnapshotFile(partitionDir);
+
+        try {
+            snapshotFile.read().ifPresent(snapshot -> 
loadRemoteLogSegmentMetadata(snapshot));
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    protected void 
loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
+        log.info("Loading snapshot for partition {} is: {}", topicIdPartition, 
snapshot);
+        for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
snapshot.remoteLogSegmentMetadataSnapshots()) {
+            switch (metadataSnapshot.state()) {
+                case COPY_SEGMENT_STARTED:
+                    
addCopyInProgressSegment(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case COPY_SEGMENT_FINISHED:
+                    
handleSegmentWithCopySegmentFinishedState(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case DELETE_SEGMENT_STARTED:
+                    
handleSegmentWithDeleteSegmentStartedState(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case DELETE_SEGMENT_FINISHED:
+                default:
+                    throw new IllegalArgumentException("Given 
remoteLogSegmentMetadata has invalid state: " + metadataSnapshot);
+            }
+        }
+    }
+
+    private RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
+        return new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), 
snapshot.startOffset(),
+                                            snapshot.endOffset(), 
snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
+                                            snapshot.segmentSizeInBytes(), 
snapshot.state(), snapshot.segmentLeaderEpochs());
+    }
+
+    /**
+     * Flushes the in-memory state to the snapshot file.
+     *
+     * @param metadataPartition       remote log metadata partition from which 
the messages have been consumed for the given
+     *                                user topic partition.
+     * @param metadataPartitionOffset remote log metadata partition offset up 
to which the messages have been consumed.
+     * @throws IOException if any errors occurred while writing the snapshot 
to the file.
+     */
+    public void flushToFile(int metadataPartition,
+                            Long metadataPartitionOffset) throws IOException {
+        List<RemoteLogSegmentMetadataSnapshot> snapshots = new 
ArrayList<>(idToSegmentMetadata.size());
+        for (RemoteLogLeaderEpochState state : leaderEpochEntries.values()) {
+            // Add unreferenced segments first, as to maintain the order when 
these segments are again read from
+            // the snapshot to build RemoteLogMetadataCache.
+            for (RemoteLogSegmentId id : state.unreferencedSegmentIds()) {
+                
snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
+            }
+            
+            // Add referenced segments.
+            for (RemoteLogSegmentId id : state.referencedSegmentIds()) {
+                
snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
+            }
+        }
+
+        snapshotFile.write(new 
RemoteLogMetadataSnapshotFile.Snapshot(metadataPartition, 
metadataPartitionOffset, snapshots));
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
index 7535aff..d5787dd 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
@@ -117,8 +117,7 @@ class RemoteLogLeaderEpochState {
         }
     }
 
-    void handleSegmentWithDeleteSegmentStartedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
-                                                    Long leaderEpochEndOffset) 
{
+    void handleSegmentWithDeleteSegmentStartedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId) {
         // Remove the offset mappings as this segment is getting deleted.
         offsetToId.remove(startOffset, remoteLogSegmentId);
 
@@ -127,8 +126,7 @@ class RemoteLogLeaderEpochState {
         unreferencedSegmentIds.add(remoteLogSegmentId);
     }
 
-    void handleSegmentWithDeleteSegmentFinishedState(long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
-                                                     Long 
leaderEpochEndOffset) {
+    void handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentId 
remoteLogSegmentId) {
         // It completely removes the tracking of this segment as it is 
considered as deleted.
         unreferencedSegmentIds.remove(remoteLogSegmentId);
     }
@@ -149,6 +147,14 @@ class RemoteLogLeaderEpochState {
         return entry == null ? null : entry.getValue();
     }
 
+    Collection<RemoteLogSegmentId> unreferencedSegmentIds() {
+        return Collections.unmodifiableCollection(unreferencedSegmentIds);
+    }
+
+    Collection<RemoteLogSegmentId> referencedSegmentIds() {
+        return Collections.unmodifiableCollection(offsetToId.values());
+    }
+
     /**
      * Action interface to act on remote log segment transition for the given 
{@link RemoteLogLeaderEpochState}.
      */
@@ -158,15 +164,15 @@ class RemoteLogLeaderEpochState {
         /**
          * Performs this operation with the given {@code 
remoteLogLeaderEpochState}.
          *
+         * @param leaderEpoch               leader epoch value
          * @param remoteLogLeaderEpochState In-memory state of the segments 
for a leader epoch.
          * @param startOffset               start offset of the segment.
          * @param segmentId                 segment id.
-         * @param leaderEpochEndOffset      end offset for the given leader 
epoch.
          */
-        void accept(RemoteLogLeaderEpochState remoteLogLeaderEpochState,
-                    Long startOffset,
-                    RemoteLogSegmentId segmentId,
-                    Long leaderEpochEndOffset);
+        void accept(int leaderEpoch,
+                    RemoteLogLeaderEpochState remoteLogLeaderEpochState,
+                    long startOffset,
+                    RemoteLogSegmentId segmentId);
     }
 
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 5853d7f..ed88bc4 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -95,14 +95,14 @@ public class RemoteLogMetadataCache {
     private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
 
     // It contains all the segment-id to metadata mappings which did not reach 
the terminal state viz DELETE_SEGMENT_FINISHED.
-    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> 
idToSegmentMetadata
+    protected final ConcurrentMap<RemoteLogSegmentId, 
RemoteLogSegmentMetadata> idToSegmentMetadata
             = new ConcurrentHashMap<>();
 
     // It contains leader epoch to the respective entry containing the state.
     // TODO We are not clearing the entry for epoch when 
RemoteLogLeaderEpochState becomes empty. This will be addressed
     // later. We will look into it when we integrate these APIs along with 
RemoteLogManager changes.
     // https://issues.apache.org/jira/browse/KAFKA-12641
-    private final ConcurrentMap<Integer, RemoteLogLeaderEpochState> 
leaderEpochEntries = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState> 
leaderEpochEntries = new ConcurrentHashMap<>();
 
     /**
      * Returns {@link RemoteLogSegmentMetadata} if it exists for the given 
leader-epoch containing the offset and with
@@ -161,77 +161,73 @@ public class RemoteLogMetadataCache {
                 throw new IllegalArgumentException("metadataUpdate: " + 
metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
                                                    " can not be updated");
             case COPY_SEGMENT_FINISHED:
-                handleSegmentWithCopySegmentFinishedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_STARTED:
-                handleSegmentWithDeleteSegmentStartedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_FINISHED:
-                handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, 
existingMetadata);
+                
handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             default:
                 throw new IllegalArgumentException("Metadata with the state " 
+ targetState + " is not supported");
         }
     }
 
-    private void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-                                                           
RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Adding remote log segment metadata to leader epoch mappings 
with update: [{}]", metadataUpdate);
-
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                
RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState);
+    protected final void 
handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, 
remoteLogLeaderEpochState, startOffset, segmentId) -> {
+                                                          long 
leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch,
+                                                                               
                             remoteLogSegmentMetadata);
+                                                          
remoteLogLeaderEpochState.handleSegmentWithCopySegmentFinishedState(startOffset,
+                                                                               
                                               segmentId,
+                                                                               
                                               leaderEpochEndOffset);
+                                                      });
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
     }
 
-    private void 
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-                                                            
RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+    protected final void 
handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Cleaning up the state for : [{}]", 
remoteLogSegmentMetadata);
 
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                
RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentStartedState);
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, 
remoteLogLeaderEpochState, startOffset, segmentId) ->
+                                                              
remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset,
 segmentId));
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
     }
 
-    private void 
handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentMetadataUpdate 
metadataUpdate,
-                                                             
RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Removing the entry as it reached the terminal state: [{}]", 
metadataUpdate);
+    private void 
handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+        log.debug("Removing the entry as it reached the terminal state: [{}]", 
remoteLogSegmentMetadata);
 
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                
RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentFinishedState);
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, 
remoteLogLeaderEpochState, startOffset, segmentId) ->
+                                                              
remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentFinishedState(segmentId));
 
         // Remove the segment's id to metadata mapping because this segment is 
considered as deleted and it cleared all
         // the state of this segment in the cache.
-        idToSegmentMetadata.remove(existingMetadata.remoteLogSegmentId());
+        
idToSegmentMetadata.remove(remoteLogSegmentMetadata.remoteLogSegmentId());
     }
 
-    private void 
doHandleSegmentStateTransitionForLeaderEpochs(RemoteLogSegmentMetadata 
existingMetadata,
+    private void 
doHandleSegmentStateTransitionForLeaderEpochs(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
                                                                
RemoteLogLeaderEpochState.Action action) {
-        RemoteLogSegmentId remoteLogSegmentId = 
existingMetadata.remoteLogSegmentId();
-        Map<Integer, Long> leaderEpochToOffset = 
existingMetadata.segmentLeaderEpochs();
+        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        Map<Integer, Long> leaderEpochToOffset = 
remoteLogSegmentMetadata.segmentLeaderEpochs();
 
         // Go through all the leader epochs and apply the given action.
         for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
             Integer leaderEpoch = entry.getKey();
             Long startOffset = entry.getValue();
-            RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
-
-            if (remoteLogLeaderEpochState == null) {
-                throw new IllegalStateException("RemoteLogLeaderEpochState 
does not exist for the leader epoch: "
-                                                + leaderEpoch);
-            } else {
-                long leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch, 
existingMetadata);
-                action.accept(remoteLogLeaderEpochState, startOffset, 
remoteLogSegmentId, leaderEpochEndOffset);
-            }
+            // leaderEpochEntries will be empty when resorting the metadata 
from snapshot.
+            RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.computeIfAbsent(
+                    leaderEpoch, x -> new RemoteLogLeaderEpochState());
+            action.accept(leaderEpoch, remoteLogLeaderEpochState, startOffset, 
remoteLogSegmentId);
         }
     }
 
-    private long highestOffsetForEpoch(Integer leaderEpoch, 
RemoteLogSegmentMetadata segmentMetadata) {
+    private static long highestOffsetForEpoch(Integer leaderEpoch, 
RemoteLogSegmentMetadata segmentMetadata) {
         // Compute the highest offset for the leader epoch with in the segment
         NavigableMap<Integer, Long> epochToOffset = 
segmentMetadata.segmentLeaderEpochs();
         Map.Entry<Integer, Long> nextEntry = 
epochToOffset.higherEntry(leaderEpoch);
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
new file mode 100644
index 0000000..cee77ee
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+    // File format:
+    // <header>[<entry>...]
+    // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long><entries-size:int>
+    // entry: <entry-length><entry-bytes>
+
+    // header size: 2 (version) + 4 (partition num) + 8 (offset) + 4 (entries 
size) = 18
+    private static final int HEADER_SIZE = 18;
+
+    private final File metadataStoreFile;
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be 
created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        Path newMetadataSnapshotFilePath = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
+        try (FileChannel fileChannel = 
FileChannel.open(newMetadataSnapshotFilePath,
+                                                        
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+
+            // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition());
+
+            // Write metadata partition offset
+            headerBuffer.putLong(snapshot.metadataPartitionOffset());
+
+            // Write entries size
+            Collection<RemoteLogSegmentMetadataSnapshot> metadataSnapshots = 
snapshot.remoteLogSegmentMetadataSnapshots();
+            headerBuffer.putInt(metadataSnapshots.size());
+
+            // Write header
+            headerBuffer.flip();
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : 
metadataSnapshots) {
+                final byte[] serializedBytes = 
serde.serialize(metadataSnapshot);
+                // entry format: <entry-length><entry-bytes>
+
+                // Write entry length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write entry bytes
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+
+            fileChannel.force(true);
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFilePath, 
metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored 
snapshot.
+     */
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new 
FileInputStream(metadataStoreFile))) {
+
+            // header: 
<version:short><metadata-partition:int><metadata-partition-offset:long>
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+            int metadataSnapshotsSize = headerBuffer.getInt();
+
+            List<RemoteLogSegmentMetadataSnapshot> result = new 
ArrayList<>(metadataSnapshotsSize);
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            int lenBufferReadCt;
+            while ((lenBufferReadCt = channel.read(lenBuffer)) > 0) {
+                lenBuffer.rewind();
+
+                if (lenBufferReadCt != lenBuffer.capacity()) {
+                    throw new IOException("Invalid amount of data read for the 
length of an entry, file may have been corrupted.");
+                }
+
+                // entry format: <entry-length><entry-bytes>
+
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {
+                    throw new IOException("Invalid amount of data read, file 
may have been corrupted.");
+                }
+
+                // We are always adding RemoteLogSegmentMetadata only as you 
can see in #write() method.
+                // Did not add a specific serde for RemoteLogSegmentMetadata 
and reusing RemoteLogMetadataSerde
+                final RemoteLogSegmentMetadataSnapshot 
remoteLogSegmentMetadata =
+                        (RemoteLogSegmentMetadataSnapshot) 
serde.deserialize(data.array());
+                result.add(remoteLogSegmentMetadata);
+            }
+
+            if (metadataSnapshotsSize != result.size()) {
+                throw new IOException("Unexpected entries in the snapshot 
file. Expected size: " + metadataSnapshotsSize
+                                              + ", but found: " + 
result.size());
+            }
+
+            return Optional.of(new Snapshot(version, metadataPartition, 
metadataPartitionOffset, result));
+        }
+    }
+
+    /**
+     * This class represents the collection of remote log metadata for a 
specific topic partition.
+     */
+    public static final class Snapshot {
+        private static final short CURRENT_VERSION = 0;
+
+        private final short version;
+        private final int metadataPartition;
+        private final long metadataPartitionOffset;
+        private final Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots;
+
+        public Snapshot(int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots) {
+            this(CURRENT_VERSION, metadataPartition, metadataPartitionOffset, 
remoteLogSegmentMetadataSnapshots);
+        }
+
+        public Snapshot(short version,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots) {
+            // We will add multiple version support in future if needed. For 
now, the only supported version is CURRENT_VERSION viz 0.
+            if (version != CURRENT_VERSION) {
+                throw new IllegalArgumentException("Unexpected version 
received: " + version);
+            }
+            this.version = version;
+            this.metadataPartition = metadataPartition;
+            this.metadataPartitionOffset = metadataPartitionOffset;
+            this.remoteLogSegmentMetadataSnapshots = 
remoteLogSegmentMetadataSnapshots;
+        }
+
+        public short version() {
+            return version;
+        }
+
+        public int metadataPartition() {
+            return metadataPartition;
+        }
+
+        public long metadataPartitionOffset() {
+            return metadataPartitionOffset;
+        }
+
+        public Collection<RemoteLogSegmentMetadataSnapshot> 
remoteLogSegmentMetadataSnapshots() {
+            return remoteLogSegmentMetadataSnapshots;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Snapshot)) return false;
+            Snapshot snapshot = (Snapshot) o;
+            return version == snapshot.version && metadataPartition == 
snapshot.metadataPartition
+                    && metadataPartitionOffset == 
snapshot.metadataPartitionOffset
+                    && Objects.equals(remoteLogSegmentMetadataSnapshots, 
snapshot.remoteLogSegmentMetadataSnapshots);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(version, metadataPartition, 
metadataPartitionOffset, remoteLogSegmentMetadataSnapshots);
+        }
+
+        @Override
+        public String toString() {
+            return "Snapshot{" +
+                    "version=" + version +
+                    ", metadataPartition=" + metadataPartition +
+                    ", metadataPartitionOffset=" + metadataPartitionOffset +
+                    ", remoteLogSegmentMetadataSnapshotsSize" + 
remoteLogSegmentMetadataSnapshots.size() +
+                    '}';
+        }
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
new file mode 100644
index 0000000..e7292c2
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * This class represents the entry containing the metadata about a remote log 
segment. This is similar to
+ * {@link RemoteLogSegmentMetadata} but it does not contain topic partition 
information. This class keeps
+ * only remote log segment ID but not the topic partition.
+ *
+ * This class is used in storing the snapshot of remote log metadata for a 
specific topic partition as mentioned
+ * in {@link RemoteLogMetadataSnapshotFile.Snapshot}.
+ */
+public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
+
+    /**
+     * Universally unique remote log segment id.
+     */
+    private final Uuid segmentId;
+
+    /**
+     * Start offset of this segment.
+     */
+    private final long startOffset;
+
+    /**
+     * End offset of this segment.
+     */
+    private final long endOffset;
+
+    /**
+     * Maximum timestamp in milli seconds in the segment
+     */
+    private final long maxTimestampMs;
+
+    /**
+     * LeaderEpoch vs offset for messages within this segment.
+     */
+    private final NavigableMap<Integer, Long> segmentLeaderEpochs;
+
+    /**
+     * Size of the segment in bytes.
+     */
+    private final int segmentSizeInBytes;
+
+    /**
+     * It indicates the state in which the action is executed on this segment.
+     */
+    private final RemoteLogSegmentState state;
+
+    /**
+     * Creates an instance with the given metadata of remote log segment.
+     * <p>
+     * {@code segmentLeaderEpochs} can not be empty. If all the records in 
this segment belong to the same leader epoch
+     * then it should have an entry with epoch mapping to start-offset of this 
segment.
+     *
+     * @param segmentId                  Universally unique remote log segment 
id.
+     * @param startOffset         Start offset of this segment (inclusive).
+     * @param endOffset           End offset of this segment (inclusive).
+     * @param maxTimestampMs      Maximum timestamp in milli seconds in this 
segment.
+     * @param brokerId            Broker id from which this event is generated.
+     * @param eventTimestampMs    Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
+     * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param state               State of the respective segment of 
remoteLogSegmentId.
+     * @param segmentLeaderEpochs leader epochs occurred within this segment.
+     */
+    public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
+                                            long startOffset,
+                                            long endOffset,
+                                            long maxTimestampMs,
+                                            int brokerId,
+                                            long eventTimestampMs,
+                                            int segmentSizeInBytes,
+                                            RemoteLogSegmentState state,
+                                            Map<Integer, Long> 
segmentLeaderEpochs) {
+        super(brokerId, eventTimestampMs);
+        this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId 
can not be null");
+        this.state = Objects.requireNonNull(state, "state can not be null");
+
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+        this.maxTimestampMs = maxTimestampMs;
+        this.segmentSizeInBytes = segmentSizeInBytes;
+
+        if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
+            throw new IllegalArgumentException("segmentLeaderEpochs can not be 
null or empty");
+        }
+
+        this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new 
TreeMap<>(segmentLeaderEpochs));
+    }
+
+    public static RemoteLogSegmentMetadataSnapshot 
create(RemoteLogSegmentMetadata metadata) {
+        return new 
RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), 
metadata.startOffset(), metadata.endOffset(),
+                                                    metadata.maxTimestampMs(), 
metadata.brokerId(), metadata.eventTimestampMs(),
+                                                    
metadata.segmentSizeInBytes(), metadata.state(), 
metadata.segmentLeaderEpochs());
+    }
+
+    /**
+     * @return unique id of this segment.
+     */
+    public Uuid segmentId() {
+        return segmentId;
+    }
+
+    /**
+     * @return Start offset of this segment (inclusive).
+     */
+    public long startOffset() {
+        return startOffset;
+    }
+
+    /**
+     * @return End offset of this segment (inclusive).
+     */
+    public long endOffset() {
+        return endOffset;
+    }
+
+    /**
+     * @return Total size of this segment in bytes.
+     */
+    public int segmentSizeInBytes() {
+        return segmentSizeInBytes;
+    }
+
+    /**
+     * @return Maximum timestamp in milli seconds of a record within this 
segment.
+     */
+    public long maxTimestampMs() {
+        return maxTimestampMs;
+    }
+
+    /**
+     * @return Map of leader epoch vs offset for the records available in this 
segment.
+     */
+    public NavigableMap<Integer, Long> segmentLeaderEpochs() {
+        return segmentLeaderEpochs;
+    }
+
+    /**
+     * Returns the current state of this remote log segment. It can be any of 
the below
+     * <ul>
+     *     {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+     *     {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}
+     *     {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}
+     *     {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}
+     * </ul>
+     */
+    public RemoteLogSegmentState state() {
+        return state;
+    }
+
+    @Override
+    public TopicIdPartition topicIdPartition() {
+        throw new UnsupportedOperationException("This metadata does not have 
topic partition with it.");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
+        RemoteLogSegmentMetadataSnapshot that = 
(RemoteLogSegmentMetadataSnapshot) o;
+        return startOffset == that.startOffset && endOffset == that.endOffset 
&& maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == 
that.segmentSizeInBytes && Objects.equals(
+                segmentId, that.segmentId) && 
Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == 
that.state;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, 
segmentLeaderEpochs, segmentSizeInBytes, state);
+    }
+
+    @Override
+    public String toString() {
+        return "RemoteLogSegmentMetadataSnapshot{" +
+                "segmentId=" + segmentId +
+                ", startOffset=" + startOffset +
+                ", endOffset=" + endOffset +
+                ", maxTimestampMs=" + maxTimestampMs +
+                ", segmentLeaderEpochs=" + segmentLeaderEpochs +
+                ", segmentSizeInBytes=" + segmentSizeInBytes +
+                ", state=" + state +
+                '}';
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
index 862defe..c92a51e 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
 
+import java.io.IOException;
+
 public abstract class RemotePartitionMetadataEventHandler {
 
     public void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
@@ -40,4 +43,11 @@ public abstract class RemotePartitionMetadataEventHandler {
     protected abstract void 
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate 
remoteLogSegmentMetadataUpdate);
 
     protected abstract void 
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata);
+
+    public abstract void syncLogMetadataSnapshot(TopicIdPartition 
topicIdPartition,
+                                                 int metadataPartition,
+                                                 Long metadataPartitionOffset) 
throws IOException;
+
+    public abstract void clearTopicPartition(TopicIdPartition 
topicIdPartition);
+
 }
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index aebef84..7051d18 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
@@ -28,7 +29,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
@@ -42,21 +45,36 @@ import java.util.concurrent.ConcurrentHashMap;
 public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
+    private final Path logDir;
+
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> 
idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, RemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> 
idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
+    public RemotePartitionMetadataStore(Path logDir) {
+        this.logDir = logDir;
+    }
+
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
         log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
 
-        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        final RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
+        TopicIdPartition topicIdPartition = 
remoteLogSegmentId.topicIdPartition();
+
+        // This should have been already existing as it is loaded when the 
partitions are assigned.
+        RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        if (remoteLogMetadataCache != null) {
+            
remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata);
+        } else {
+            throw new IllegalStateException("No partition metadata found for : 
" + topicIdPartition);
+        }
+    }
 
-        idToRemoteLogMetadataCache
-                .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> 
new RemoteLogMetadataCache())
-                .addCopyInProgressSegment(remoteLogSegmentMetadata);
+    private Path partitionLogDirectory(TopicPartition topicPartition) {
+        return new File(logDir.toFile(), topicPartition.topic() + "-" + 
topicPartition.partition()).toPath();
     }
 
     @Override
@@ -69,10 +87,10 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
             try {
                 
remoteLogMetadataCache.updateRemoteLogSegmentMetadata(rlsmUpdate);
             } catch (RemoteResourceNotFoundException e) {
-                log.error("Error occurred while updating the remote log 
segment.");
+                log.warn("Error occurred while updating the remote log 
segment.", e);
             }
         } else {
-            log.error("No partition metadata found for : " + topicIdPartition);
+            throw new IllegalStateException("No partition metadata found for : 
" + topicIdPartition);
         }
     }
 
@@ -91,6 +109,27 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         }
     }
 
+    @Override
+    public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
+                                        int metadataPartition,
+                                        Long metadataPartitionOffset) throws 
IOException {
+        RemotePartitionDeleteMetadata partitionDeleteMetadata = 
idToPartitionDeleteMetadata.get(topicIdPartition);
+        if (partitionDeleteMetadata != null) {
+            log.info("Skipping syncing of metadata snapshot as remote 
partition [{}] is with state: [{}] ", topicIdPartition,
+                     partitionDeleteMetadata);
+        } else {
+            FileBasedRemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+            if (remoteLogMetadataCache != null) {
+                remoteLogMetadataCache.flushToFile(metadataPartition, 
metadataPartitionOffset);
+            }
+        }
+    }
+
+    @Override
+    public void clearTopicPartition(TopicIdPartition topicIdPartition) {
+        idToRemoteLogMetadataCache.remove(topicIdPartition);
+    }
+
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition)
             throws RemoteStorageException {
         Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
@@ -105,9 +144,9 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         return 
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
     }
 
-    private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition 
topicIdPartition)
+    private FileBasedRemoteLogMetadataCache 
getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
             throws RemoteResourceNotFoundException {
-        RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        FileBasedRemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
         if (remoteLogMetadataCache == null) {
             throw new RemoteResourceNotFoundException("No resource found for 
partition: " + topicIdPartition);
         }
@@ -140,4 +179,10 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         idToPartitionDeleteMetadata = Collections.emptyMap();
         idToRemoteLogMetadataCache = Collections.emptyMap();
     }
+
+    public void maybeLoadPartition(TopicIdPartition partition) {
+        idToRemoteLogMetadataCache.computeIfAbsent(partition,
+            topicIdPartition -> new 
FileBasedRemoteLogMetadataCache(topicIdPartition, 
partitionLogDirectory(topicIdPartition.topicPartition())));
+    }
+
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index b6bfd64..0271780 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -39,6 +39,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
@@ -72,6 +73,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final Time time = Time.SYSTEM;
+    private final boolean startConsumerThread;
 
     private Thread initializationThread;
     private volatile ProducerManager producerManager;
@@ -81,12 +83,21 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
     // requests calling different methods which use the resources like 
producer/consumer managers.
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+    private RemotePartitionMetadataStore remotePartitionMetadataStore;
     private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
     private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
     private final Set<TopicIdPartition> pendingAssignPartitions = 
Collections.synchronizedSet(new HashSet<>());
     private volatile boolean initializationFailed;
 
+    public TopicBasedRemoteLogMetadataManager() {
+        this(true);
+    }
+
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
+        this.startConsumerThread = startConsumerThread;
+    }
+
     @Override
     public CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
             throws RemoteStorageException {
@@ -267,26 +278,34 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
         log.info("Received leadership notifications with leader partitions {} 
and follower partitions {}",
                  leaderPartitions, followerPartitions);
 
-        HashSet<TopicIdPartition> allPartitions = new 
HashSet<>(leaderPartitions);
-        allPartitions.addAll(followerPartitions);
         lock.readLock().lock();
         try {
             if (closing.get()) {
                 throw new IllegalStateException("This instance is in closing 
state");
             }
 
+            HashSet<TopicIdPartition> allPartitions = new 
HashSet<>(leaderPartitions);
+            allPartitions.addAll(followerPartitions);
             if (!initialized.get()) {
                 // If it is not yet initialized, then keep them as pending 
partitions and assign them
                 // when it is initialized successfully in 
initializeResources().
                 this.pendingAssignPartitions.addAll(allPartitions);
             } else {
-                consumerManager.addAssignmentsForPartitions(allPartitions);
+                assignPartitions(allPartitions);
             }
         } finally {
             lock.readLock().unlock();
         }
     }
 
+    private void assignPartitions(Set<TopicIdPartition> allPartitions) {
+        for (TopicIdPartition partition : allPartitions) {
+            remotePartitionMetadataStore.maybeLoadPartition(partition);
+        }
+
+        consumerManager.addAssignmentsForPartitions(allPartitions);
+    }
+
     @Override
     public void onStopPartitions(Set<TopicIdPartition> partitions) {
         lock.readLock().lock();
@@ -323,6 +342,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
             rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+            remotePartitionMetadataStore = new 
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
             configured = true;
             log.info("Successfully initialized with rlmmConfig: {}", 
rlmmConfig);
 
@@ -385,10 +405,14 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
                 try {
                     producerManager = new ProducerManager(rlmmConfig, 
rlmmTopicPartitioner);
                     consumerManager = new ConsumerManager(rlmmConfig, 
remotePartitionMetadataStore, rlmmTopicPartitioner, time);
-                    consumerManager.startConsumerThread();
+                    if (startConsumerThread) {
+                        consumerManager.startConsumerThread();
+                    } else {
+                        log.info("RLMM Consumer task thread is not configured 
to be started.");
+                    }
 
                     if (!pendingAssignPartitions.isEmpty()) {
-                        
consumerManager.addAssignmentsForPartitions(pendingAssignPartitions);
+                        assignPartitions(pendingAssignPartitions);
                         pendingAssignPartitions.clear();
                     }
 
@@ -476,6 +500,18 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManagerConfig config() {
+        return rlmmConfig;
+    }
+
+    // Visible for testing.
+    public void startConsumerThread() {
+        if (consumerManager != null) {
+            consumerManager.startConsumerThread();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         // Close all the resources.
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 3fd6b2b..7e52519 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -74,6 +74,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
     public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = 
"remote.log.metadata.producer.";
     public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = 
"remote.log.metadata.consumer.";
     public static final String BROKER_ID = "broker.id";
+    public static final String LOG_DIR = "log.dir";
 
     private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = 
"__remote_log_metadata_client";
 
@@ -97,6 +98,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
 
     private final String clientIdPrefix;
     private final int metadataTopicPartitionsCount;
+    private final String logDir;
     private final long consumeWaitMs;
     private final long metadataTopicRetentionMs;
     private final short metadataTopicReplicationFactor;
@@ -111,6 +113,11 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
 
         Map<String, Object> parsedConfigs = CONFIG.parse(props);
 
+        logDir = (String) props.get(LOG_DIR);
+        if (logDir == null || logDir.isEmpty()) {
+            throw new IllegalArgumentException(LOG_DIR + " config must not be 
null or empty.");
+        }
+
         metadataTopicPartitionsCount = (int) 
parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP);
         metadataTopicReplicationFactor = (short) 
parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP);
         metadataTopicRetentionMs = (long) 
parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP);
@@ -179,6 +186,10 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         return initializationRetryIntervalMs;
     }
 
+    public String logDir() {
+        return logDir;
+    }
+
     public Map<String, Object> consumerProperties() {
         return consumerProps;
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
index 94975b9..4a63b56 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
@@ -19,8 +19,10 @@ package 
org.apache.kafka.server.log.remote.metadata.storage.serialization;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.serialization.BytesApiMessageSerde;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
 import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
@@ -39,6 +41,7 @@ public class RemoteLogMetadataSerde {
     private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new 
RemoteLogSegmentMetadataRecord().apiKey();
     private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
new RemoteLogSegmentMetadataUpdateRecord().apiKey();
     private static final short REMOTE_PARTITION_DELETE_API_KEY = new 
RemotePartitionDeleteMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = 
new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
     private final Map<String, Short> remoteLogStorageClassToApiKey;
     private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
@@ -64,6 +67,7 @@ public class RemoteLogMetadataSerde {
         map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new 
RemoteLogSegmentMetadataTransform());
         map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new 
RemoteLogSegmentMetadataUpdateTransform());
         map.put(REMOTE_PARTITION_DELETE_API_KEY, new 
RemotePartitionDeleteMetadataTransform());
+        map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new 
RemoteLogSegmentMetadataSnapshotTransform());
         return map;
     }
 
@@ -72,6 +76,7 @@ public class RemoteLogMetadataSerde {
         map.put(RemoteLogSegmentMetadata.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_API_KEY);
         map.put(RemoteLogSegmentMetadataUpdate.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
         map.put(RemotePartitionDeleteMetadata.class.getName(), 
REMOTE_PARTITION_DELETE_API_KEY);
+        map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), 
REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
         return map;
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
new file mode 100644
index 0000000..bd613f8
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataSnapshotTransform implements 
RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
+
+    public ApiMessageAndVersion 
toApiMessageAndVersion(RemoteLogSegmentMetadataSnapshot segmentMetadata) {
+        RemoteLogSegmentMetadataSnapshotRecord record = new 
RemoteLogSegmentMetadataSnapshotRecord()
+                .setSegmentId(segmentMetadata.segmentId())
+                .setStartOffset(segmentMetadata.startOffset())
+                .setEndOffset(segmentMetadata.endOffset())
+                .setBrokerId(segmentMetadata.brokerId())
+                .setEventTimestampMs(segmentMetadata.eventTimestampMs())
+                .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
+                .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
+                
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
+                .setRemoteLogSegmentState(segmentMetadata.state().id());
+
+        return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
+    }
+
+    private 
List<RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry> 
createSegmentLeaderEpochsEntry(Map<Integer, Long> leaderEpochs) {
+        return leaderEpochs.entrySet().stream()
+                           .map(entry -> new 
RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry()
+                           .setLeaderEpoch(entry.getKey())
+                           .setOffset(entry.getValue()))
+                           .collect(Collectors.toList());
+    }
+
+    @Override
+    public RemoteLogSegmentMetadataSnapshot 
fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
+        RemoteLogSegmentMetadataSnapshotRecord record = 
(RemoteLogSegmentMetadataSnapshotRecord) apiMessageAndVersion.message();
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        for (RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry 
segmentLeaderEpoch : record.segmentLeaderEpochs()) {
+            segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), 
segmentLeaderEpoch.offset());
+        }
+
+        return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
+                                                    record.startOffset(),
+                                                    record.endOffset(),
+                                                    record.maxTimestampMs(),
+                                                    record.brokerId(),
+                                                    record.eventTimestampMs(),
+                                                    
record.segmentSizeInBytes(),
+                                                    
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
+                                                    segmentLeaderEpochs);
+    }
+
+}
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
new file mode 100644
index 0000000..dbb2913
--- /dev/null
+++ 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 3,
+  "type": "metadata",
+  "name": "RemoteLogSegmentMetadataSnapshotRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {
+      "name": "SegmentId",
+      "type": "uuid",
+      "versions": "0+",
+      "about": "Unique identifier of the log segment"
+    },
+    {
+      "name": "StartOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Start offset  of the segment."
+    },
+    {
+      "name": "EndOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "End offset  of the segment."
+    },
+    {
+      "name": "BrokerId",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Broker (controller or leader) id from which this event is 
created or updated."
+    },
+    {
+      "name": "MaxTimestampMs",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Maximum timestamp with in this segment."
+    },
+    {
+      "name": "EventTimestampMs",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Event timestamp of this segment."
+    },
+    {
+      "name": "SegmentLeaderEpochs",
+      "type": "[]SegmentLeaderEpochEntry",
+      "versions": "0+",
+      "about": "Leader epochs of this segment.",
+      "fields": [
+        {
+          "name": "LeaderEpoch",
+          "type": "int32",
+          "versions": "0+",
+          "about": "Leader epoch"
+        },
+        {
+          "name": "Offset",
+          "type": "int64",
+          "versions": "0+",
+          "about": "Start offset for the leader epoch"
+        }
+      ]
+    },
+    {
+      "name": "SegmentSizeInBytes",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Segment size in bytes"
+    },
+    {
+      "name": "RemoteLogSegmentState",
+      "type": "int8",
+      "versions": "0+",
+      "about": "State of the remote log segment"
+    }
+  ]
+}
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
new file mode 100644
index 0000000..5f77417
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FileBasedRemoteLogMetadataCacheTest {
+
+    @Test
+    public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() 
throws Exception {
+        TopicIdPartition partition = new TopicIdPartition(Uuid.randomUuid(), 
new TopicPartition("test", 0));
+        int brokerId = 0;
+        Path path = TestUtils.tempDirectory().toPath();
+
+        // Create file based metadata cache.
+        FileBasedRemoteLogMetadataCache cache = new 
FileBasedRemoteLogMetadataCache(partition, path);
+
+        // Add a segment with start offset as 0 for leader epoch 0.
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(partition, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata metadata1 = new 
RemoteLogSegmentMetadata(segmentId1,
+                                                                          0, 
100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
+                                                                          1024 
* 1024, Collections.singletonMap(0, 0L));
+        cache.addCopyInProgressSegment(metadata1);
+        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new 
RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
+                                                                               
            RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
+        Optional<RemoteLogSegmentMetadata> receivedMetadata = 
cache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadata.isPresent());
+        assertEquals(metadata1.createWithUpdates(metadataUpdate1), 
receivedMetadata.get());
+
+        // Add a new segment with start offset as 0 for leader epoch 0, which 
should replace the earlier segment.
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(partition, 
Uuid.randomUuid());
+        RemoteLogSegmentMetadata metadata2 = new 
RemoteLogSegmentMetadata(segmentId2,
+                                                                          0, 
900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
+                                                                          1024 
* 1024, Collections.singletonMap(0, 0L));
+        cache.addCopyInProgressSegment(metadata2);
+        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new 
RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
+                                                                               
            RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
+
+        // Fetch segment for leader epoch:0 and start offset:0, it should be 
the newly added segment.
+        Optional<RemoteLogSegmentMetadata> receivedMetadata2 = 
cache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadata2.isPresent());
+        assertEquals(metadata2.createWithUpdates(metadataUpdate2), 
receivedMetadata2.get());
+        // Flush the cache to the file.
+        cache.flushToFile(0, 0L);
+
+        // Create a new cache with loading from the stored path.
+        FileBasedRemoteLogMetadataCache loadedCache = new 
FileBasedRemoteLogMetadataCache(partition, path);
+
+        // Fetch segment for leader epoch:0 and start offset:0, it should be 
metadata2.
+        // This ensures that the ordering of metadata is taken care after 
loading from the stored snapshots.
+        Optional<RemoteLogSegmentMetadata> receivedMetadataAfterLoad = 
loadedCache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadataAfterLoad.isPresent());
+        assertEquals(metadata2.createWithUpdates(metadataUpdate2), 
receivedMetadataAfterLoad.get());
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
new file mode 100644
index 0000000..1b46028
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+
+public class RemoteLogMetadataSnapshotFileTest {
+
+    @Test
+    public void testEmptyCommittedLogMetadataFile() throws Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        // There should be an empty snapshot as nothing is written into it.
+        Assertions.assertFalse(snapshotFile.read().isPresent());
+    }
+
+    @Test
+    public void testEmptySnapshotWithCommittedLogMetadataFile() throws 
Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(0, 0L, 
Collections.emptyList()));
+
+        // There should be an empty snapshot as the written snapshot did not 
have any remote log segment metadata.
+        Assertions.assertTrue(snapshotFile.read().isPresent());
+        
Assertions.assertTrue(snapshotFile.read().get().remoteLogSegmentMetadataSnapshots().isEmpty());
+    }
+
+    @Test
+    public void testWriteReadCommittedLogMetadataFile() throws Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new 
RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        List<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadatas = new 
ArrayList<>();
+        long startOffset = 0;
+        for (int i = 0; i < 100; i++) {
+            long endOffset = startOffset + 100L;
+            remoteLogSegmentMetadatas.add(
+                    new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), 
startOffset, endOffset,
+                                                         
System.currentTimeMillis(), 1, 100, 1024,
+                                                         
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, 
startOffset)));
+            startOffset = endOffset + 1;
+        }
+
+        RemoteLogMetadataSnapshotFile.Snapshot snapshot = new 
RemoteLogMetadataSnapshotFile.Snapshot(0, 120,
+                                                                               
                      remoteLogSegmentMetadatas);
+        snapshotFile.write(snapshot);
+
+        Optional<RemoteLogMetadataSnapshotFile.Snapshot> maybeReadSnapshot = 
snapshotFile.read();
+        Assertions.assertTrue(maybeReadSnapshot.isPresent());
+
+        Assertions.assertEquals(snapshot, maybeReadSnapshot.get());
+        Assertions.assertEquals(new 
HashSet<>(snapshot.remoteLogSegmentMetadataSnapshots()),
+                                new 
HashSet<>(maybeReadSnapshot.get().remoteLogSegmentMetadataSnapshots()));
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 53d79a0..8272b8d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -417,7 +417,7 @@ public class RemoteLogSegmentLifecycleTest {
      * This is passed to {@link 
#testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
      * {@code RemoteLogMetadataCache} for several lifecycle operations.
      * <p>
-     * This starts a Kafka cluster with {@link #initialize(Set)} with {@link 
#brokerCount()} no of servers. It also
+     * This starts a Kafka cluster with {@link #initialize(Set, boolean)} )} 
with {@link #brokerCount()} no of servers. It also
      * creates the remote log metadata topic required for {@code 
TopicBasedRemoteLogMetadataManager}. This cluster will
      * be stopped by invoking {@link #close()}.
      */
@@ -428,14 +428,14 @@ public class RemoteLogSegmentLifecycleTest {
         @Override
         public synchronized void initialize(TopicIdPartition topicIdPartition) 
{
             this.topicIdPartition = topicIdPartition;
-            super.initialize(Collections.singleton(topicIdPartition));
+            super.initialize(Collections.singleton(topicIdPartition), true);
         }
 
         @Override
         public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
segmentMetadata) throws RemoteStorageException {
             try {
                 // Wait until the segment is added successfully.
-                
topicBasedRlmm().addRemoteLogSegmentMetadata(segmentMetadata).get();
+                
remoteLogMetadataManager().addRemoteLogSegmentMetadata(segmentMetadata).get();
             } catch (Exception e) {
                 throw new RemoteStorageException(e);
             }
@@ -445,7 +445,7 @@ public class RemoteLogSegmentLifecycleTest {
         public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
segmentMetadataUpdate) throws RemoteStorageException {
             try {
                 // Wait until the segment is updated successfully.
-                
topicBasedRlmm().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
+                
remoteLogMetadataManager().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
             } catch (Exception e) {
                 throw new RemoteStorageException(e);
             }
@@ -453,23 +453,23 @@ public class RemoteLogSegmentLifecycleTest {
 
         @Override
         public Optional<Long> highestOffsetForEpoch(int leaderEpoch) throws 
RemoteStorageException {
-            return topicBasedRlmm().highestOffsetForEpoch(topicIdPartition, 
leaderEpoch);
+            return 
remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
         }
 
         @Override
         public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch,
                                                                            
long offset) throws RemoteStorageException {
-            return topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition, 
leaderEpoch, offset);
+            return 
remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, 
leaderEpoch, offset);
         }
 
         @Override
         public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int 
leaderEpoch) throws RemoteStorageException {
-            return topicBasedRlmm().listRemoteLogSegments(topicIdPartition, 
leaderEpoch);
+            return 
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch);
         }
 
         @Override
         public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() 
throws RemoteStorageException {
-            return topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
+            return 
remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
         }
 
         @Override
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 7e1930d..3785c8d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -28,6 +29,8 @@ import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -116,7 +119,8 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
                                                        Map<String, Object> 
consumerConfig) {
         Map<String, Object> props = new HashMap<>();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
BOOTSTRAP_SERVERS);
-        props.put("broker.id", 1);
+        props.put(BROKER_ID, 1);
+        props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());
 
         props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 
3);
         props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10);
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index 6bc56b5..31dd43f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -18,24 +18,32 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 
 import kafka.api.IntegrationTestHarness;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
 
+/**
+ * A test harness class that brings up 3 brokers and registers {@link 
TopicBasedRemoteLogMetadataManager} on broker with id as 0.
+ */
 public class TopicBasedRemoteLogMetadataManagerHarness extends 
IntegrationTestHarness {
     private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
 
@@ -49,16 +57,42 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
         return Collections.emptyMap();
     }
 
-    public void initialize(Set<TopicIdPartition> topicIdPartitions) {
+    public void initialize(Set<TopicIdPartition> topicIdPartitions,
+                           boolean startConsumerThread) {
         // Call setup to start the cluster.
         super.setUp();
 
-        topicBasedRemoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManager();
+        initializeRemoteLogMetadataManager(topicIdPartitions, 
startConsumerThread);
+    }
+
+    public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> 
topicIdPartitions,
+                                                   boolean 
startConsumerThread) {
+        String logDir = 
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
+        topicBasedRemoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManager(startConsumerThread) {
+            @Override
+            public void onPartitionLeadershipChanges(Set<TopicIdPartition> 
leaderPartitions,
+                                                     Set<TopicIdPartition> 
followerPartitions) {
+                Set<TopicIdPartition> allReplicas = new 
HashSet<>(leaderPartitions);
+                allReplicas.addAll(followerPartitions);
+                // Make sure the topic partition dirs exist as the topics 
might not have been created on this broker.
+                for (TopicIdPartition topicIdPartition : allReplicas) {
+                    // Create partition directory in the log directory created 
by topicBasedRemoteLogMetadataManager.
+                    File partitionDir = new File(new File(config().logDir()), 
topicIdPartition.topicPartition().topic() + "-" + 
topicIdPartition.topicPartition().partition());
+                    partitionDir.mkdirs();
+                    if (!partitionDir.exists()) {
+                        throw new KafkaException("Partition directory:[" + 
partitionDir + "] could not be created successfully.");
+                    }
+                }
+
+                super.onPartitionLeadershipChanges(leaderPartitions, 
followerPartitions);
+            }
+        };
 
         // Initialize TopicBasedRemoteLogMetadataManager.
         Map<String, Object> configs = new HashMap<>();
         configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList());
         configs.put(BROKER_ID, 0);
+        configs.put(LOG_DIR, logDir);
         configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 
METADATA_TOPIC_PARTITIONS_COUNT);
         configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, 
METADATA_TOPIC_REPLICATION_FACTOR);
         configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 
METADATA_TOPIC_RETENTION_MS);
@@ -72,7 +106,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
         try {
             waitUntilInitialized(60_000);
         } catch (TimeoutException e) {
-            throw new RuntimeException(e);
+            throw new KafkaException(e);
         }
 
         
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions,
 Collections.emptySet());
@@ -96,14 +130,18 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
         return 3;
     }
 
-    protected TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
+    protected TopicBasedRemoteLogMetadataManager remoteLogMetadataManager() {
         return topicBasedRemoteLogMetadataManager;
     }
 
     public void close() throws IOException {
-        Utils.closeQuietly(topicBasedRemoteLogMetadataManager, 
"TopicBasedRemoteLogMetadataManager");
+        closeRemoteLogMetadataManager();
 
         // Stop the servers and zookeeper.
         tearDown();
     }
+
+    public void closeRemoteLogMetadataManager() {
+        Utils.closeQuietly(topicBasedRemoteLogMetadataManager, 
"TopicBasedRemoteLogMetadataManager");
+    }
 }
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
new file mode 100644
index 0000000..2c7baf8
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerRestartTest {
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final String logDir = 
TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
+
+    private TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness;
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
+        remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness() {
+            protected Map<String, Object> 
overrideRemoteLogMetadataManagerProps() {
+                Map<String, Object> props = new HashMap<>();
+                props.put(LOG_DIR, logDir);
+                return props;
+            }
+        };
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), 
true);
+    }
+
+    private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread) {
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread);
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        if (remoteLogMetadataManagerHarness != null) {
+            remoteLogMetadataManagerHarness.close();
+        }
+    }
+
+    private void stopTopicBasedRemoteLogMetadataManagerHarness() throws 
IOException {
+        remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager();
+    }
+
+    private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @Test
+    public void testRLMMAPIsAfterRestart() throws Exception {
+        // Create topics.
+        String leaderTopic = "new-leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(leaderTopic, 
JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
+
+        String followerTopic = "new-follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
+
+        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+
+        // Register these partitions to RLMM.
+        
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
 Collections.singleton(followerTopicIdPartition));
+
+        // Add segments for these partitions but they are not available as 
they have not yet been subscribed.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+                0, 100, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
+        
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
+
+        RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
+                0, 100, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
0L));
+        
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get();
+
+        // Stop TopicBasedRemoteLogMetadataManager only.
+        stopTopicBasedRemoteLogMetadataManagerHarness();
+
+        // Start TopicBasedRemoteLogMetadataManager but do not start consumer 
thread to check whether the stored metadata is
+        // loaded successfully or not.
+        startTopicBasedRemoteLogMetadataManagerHarness(false);
+
+        // Register these partitions to RLMM, which loads the respective 
metadata snapshots.
+        
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
 Collections.singleton(followerTopicIdPartition));
+
+        // Check for the stored entries from the earlier run.
+        
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(),
+                                                                 
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
+        
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
+                                                                 
topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)));
+        // Check whether the check-pointed consumer offsets are stored or not.
+        Path committedOffsetsPath = new File(logDir, 
COMMITTED_OFFSETS_FILE_NAME).toPath();
+        Assertions.assertTrue(committedOffsetsPath.toFile().exists());
+        CommittedOffsetsFile committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+
+        int metadataPartition1 = 
topicBasedRlmm().metadataPartition(leaderTopicIdPartition);
+        int metadataPartition2 = 
topicBasedRlmm().metadataPartition(followerTopicIdPartition);
+        Optional<Long> receivedOffsetForPartition1 = 
topicBasedRlmm().receivedOffsetForPartition(metadataPartition1);
+        Optional<Long> receivedOffsetForPartition2 = 
topicBasedRlmm().receivedOffsetForPartition(metadataPartition2);
+        Assertions.assertTrue(receivedOffsetForPartition1.isPresent());
+        Assertions.assertTrue(receivedOffsetForPartition2.isPresent());
+
+        // Make sure these offsets are at least 0.
+        Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0);
+        Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0);
+
+        // Check the stored entries and the offsets that were set on consumer 
are the same.
+        Map<Integer, Long> partitionToOffset = 
committedOffsetsFile.readEntries();
+        Assertions.assertEquals(partitionToOffset.get(metadataPartition1), 
receivedOffsetForPartition1.get());
+        Assertions.assertEquals(partitionToOffset.get(metadataPartition2), 
receivedOffsetForPartition2.get());
+
+        // Start Consumer thread
+        topicBasedRlmm().startConsumerThread();
+
+        // Add one more segment
+        RemoteLogSegmentMetadata leaderSegmentMetadata2 = new 
RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+                101, 200, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 
101L));
+        
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
+
+        // Check that both the stored segment and recently added segment are 
available.
+        
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata,
 leaderSegmentMetadata2).iterator(),
+                                                                 
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
+    }
+
+}
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index dce2c26..89b25c6 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -31,31 +31,29 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
-import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP;
-
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
 public class TopicBasedRemoteLogMetadataManagerTest {
     private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class);
 
     private static final int SEG_SIZE = 1024 * 1024;
 
     private final Time time = new MockTime(1);
-    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness() {
-        @Override
-        protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
-            return 
Collections.singletonMap(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, 5000L);
-        }
-    };
+    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
 
     @BeforeEach
     public void setup() {
         // Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
-        remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), 
true);
     }
 
     @AfterEach
@@ -64,13 +62,41 @@ public class TopicBasedRemoteLogMetadataManagerTest {
     }
 
     public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm();
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @Test
+    public void testWithNoAssignedPartitions() throws Exception {
+        // This test checks simple lifecycle of 
TopicBasedRemoteLogMetadataManager with out assigning any leader/follower 
partitions.
+        // This should close successfully releasing the resources.
+        log.info("Not assigning any partitions on 
TopicBasedRemoteLogMetadataManager");
     }
 
     @Test
     public void testNewPartitionUpdates() throws Exception {
-        final TopicIdPartition newLeaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-leader", 0));
-        final TopicIdPartition newFollowerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-follower", 0));
+        // Create topics.
+        String leaderTopic = "new-leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(leaderTopic, 
JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
+
+        String followerTopic = "new-follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
+
+        final TopicIdPartition newLeaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition newFollowerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
 
         // Add segments for these partitions but an exception is received as 
they have not yet been subscribed.
         // These messages would have been published to the respective metadata 
topic partitions but the ConsumerManager
@@ -101,8 +127,8 @@ public class TopicBasedRemoteLogMetadataManagerTest {
     }
 
     private void waitUntilConsumerCatchesup(TopicIdPartition 
newLeaderTopicIdPartition,
-                                            TopicIdPartition 
newFollowerTopicIdPartition,
-                                            long timeoutMs) throws 
TimeoutException {
+                                          TopicIdPartition 
newFollowerTopicIdPartition,
+                                          long timeoutMs) throws 
TimeoutException {
         int leaderMetadataPartition = 
topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
         int followerMetadataPartition = 
topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
index 9c25fb0..ef9287d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
@@ -37,63 +37,64 @@ public class 
TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
 
     @Override
     public CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) 
throws RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
     }
 
     @Override
     public CompletableFuture<Void> 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
     }
 
     @Override
     public Optional<RemoteLogSegmentMetadata> 
remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                        int 
epochForOffset,
                                                                        long 
offset) throws RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition,
 epochForOffset, offset);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition,
 epochForOffset, offset);
     }
 
     @Override
     public Optional<Long> highestOffsetForEpoch(TopicIdPartition 
topicIdPartition,
                                                 int leaderEpoch) throws 
RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().highestOffsetForEpoch(topicIdPartition,
 leaderEpoch);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition,
 leaderEpoch);
     }
 
     @Override
     public CompletableFuture<Void> 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata) throws RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition,
                                                                     int 
leaderEpoch) throws RemoteStorageException {
-        return 
remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition,
 leaderEpoch);
+        return 
remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition,
 leaderEpoch);
     }
 
     @Override
     public void onPartitionLeadershipChanges(Set<TopicIdPartition> 
leaderPartitions,
                                              Set<TopicIdPartition> 
followerPartitions) {
-        
remoteLogMetadataManagerHarness.topicBasedRlmm().onPartitionLeadershipChanges(leaderPartitions,
 followerPartitions);
+
+        
remoteLogMetadataManagerHarness.remoteLogMetadataManager().onPartitionLeadershipChanges(leaderPartitions,
 followerPartitions);
     }
 
     @Override
     public void onStopPartitions(Set<TopicIdPartition> partitions) {
-        
remoteLogMetadataManagerHarness.topicBasedRlmm().onStopPartitions(partitions);
+        
remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions);
     }
 
     @Override
     public void close() throws IOException {
-        remoteLogMetadataManagerHarness.topicBasedRlmm().close();
+        remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
     }
 
     @Override
     public void configure(Map<String, ?> configs) {
         // This will make sure the cluster is up and 
TopicBasedRemoteLogMetadataManager is initialized.
-        remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
-        remoteLogMetadataManagerHarness.topicBasedRlmm().configure(configs);
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), 
true);
+        
remoteLogMetadataManagerHarness.remoteLogMetadataManager().configure(configs);
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 4bec4ed..95521c4 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -90,7 +90,6 @@ public class RemoteLogMetadataManagerTest {
             remoteLogMetadataManager.configure(Collections.emptyMap());
             
remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0),
 Collections.emptySet());
 
-
             // Create remote log segment metadata and add them to RLMM.
 
             // segment 0

Reply via email to