This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff3e6842ff9 KAFKA-15181: Wait for RemoteLogMetadataCache to initialize 
after assigning partitions (#14127)
ff3e6842ff9 is described below

commit ff3e6842ff99a600fc02e69ebefb09eef93decb3
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Sat Aug 26 05:52:26 2023 +0530

    KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning 
partitions (#14127)
    
    This PR adds the following changes to the 
`TopicBasedRemoteLogMetadataManager`
    
    1. Added a guard in RemoteLogMetadataCache so that the incoming request can 
be served from the cache iff the corresponding user-topic-partition is 
initalized
    2. Improve error handling in ConsumerTask thread so that is not killed when 
there are errors in reading the internal topic
    3. ConsumerTask initialization should handle the case when there are no 
records to read
    and some other minor changes
    
    Added Unit Tests for the changes
    
    Co-authored-by: Kamal Chandraprakash <[email protected]>
    
    Reviewers: Luke Chen <[email protected]>, Jorge Esteban Quilcate Otoya 
<[email protected]>, Christo Lolov <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../remote/metadata/storage/ConsumerManager.java   |  32 +-
 .../log/remote/metadata/storage/ConsumerTask.java  | 546 ++++++++++++---------
 .../metadata/storage/RemoteLogMetadataCache.java   |  11 +
 .../RemotePartitionMetadataEventHandler.java       |   5 +
 .../storage/RemotePartitionMetadataStore.java      |  19 +
 .../TopicBasedRemoteLogMetadataManager.java        |  20 +-
 .../remote/metadata/storage/ConsumerTaskTest.java  | 417 ++++++++++++++++
 .../TopicBasedRemoteLogMetadataManagerHarness.java |  10 +-
 ...ogMetadataManagerMultipleSubscriptionsTest.java | 178 +++++++
 ...icBasedRemoteLogMetadataManagerRestartTest.java |  34 +-
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  10 +-
 11 files changed, 990 insertions(+), 292 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 14ec707a2eb..186cbb17c56 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -27,9 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
@@ -41,8 +39,6 @@ import java.util.concurrent.TimeoutException;
  */
 public class ConsumerManager implements Closeable {
 
-    public static final String COMMITTED_OFFSETS_FILE_NAME = 
"_rlmm_committed_offsets";
-
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerManager.class);
     private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
 
@@ -60,15 +56,13 @@ public class ConsumerManager implements Closeable {
 
         //Create a task to consume messages and submit the respective events 
to RemotePartitionMetadataEventHandler.
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(rlmmConfig.consumerProperties());
-        Path committedOffsetsPath = new File(rlmmConfig.logDir(), 
COMMITTED_OFFSETS_FILE_NAME).toPath();
         consumerTask = new ConsumerTask(
-                consumer,
-                rlmmConfig.remoteLogMetadataTopicName(),
-                remotePartitionMetadataEventHandler,
-                topicPartitioner,
-                committedOffsetsPath,
-                time,
-                60_000L
+            remotePartitionMetadataEventHandler,
+            topicPartitioner,
+            consumer,
+            100L,
+            300_000L,
+            time
         );
         consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", 
consumerTask);
     }
@@ -110,7 +104,7 @@ public class ConsumerManager implements Closeable {
         log.info("Waiting until consumer is caught up with the target 
partition: [{}]", partition);
 
         // If the current assignment does not have the subscription for this 
partition then return immediately.
-        if (!consumerTask.isPartitionAssigned(partition)) {
+        if (!consumerTask.isMetadataPartitionAssigned(partition)) {
             throw new KafkaException("This consumer is not assigned to the 
target partition " + partition + ". " +
                     "Partitions currently assigned: " + 
consumerTask.metadataPartitionsAssigned());
         }
@@ -119,17 +113,17 @@ public class ConsumerManager implements Closeable {
         long startTimeMs = time.milliseconds();
         while (true) {
             log.debug("Checking if partition [{}] is up to date with offset 
[{}]", partition, offset);
-            long receivedOffset = 
consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
-            if (receivedOffset >= offset) {
+            long readOffset = 
consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
+            if (readOffset >= offset) {
                 return;
             }
 
             log.debug("Expected offset [{}] for partition [{}], but the 
committed offset: [{}],  Sleeping for [{}] to retry again",
-                    offset, partition, receivedOffset, consumeCheckIntervalMs);
+                    offset, partition, readOffset, consumeCheckIntervalMs);
 
             if (time.milliseconds() - startTimeMs > timeoutMs) {
                 log.warn("Expected offset for partition:[{}] is : [{}], but 
the committed offset: [{}] ",
-                        partition, receivedOffset, offset);
+                        partition, readOffset, offset);
                 throw new TimeoutException("Timed out in catching up with the 
expected offset by consumer.");
             }
 
@@ -158,7 +152,7 @@ public class ConsumerManager implements Closeable {
         consumerTask.removeAssignmentsForPartitions(partitions);
     }
 
-    public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
-        return consumerTask.receivedOffsetForPartition(metadataPartition);
+    public Optional<Long> readOffsetForPartition(int metadataPartition) {
+        return consumerTask.readOffsetForMetadataPartition(metadataPartition);
     }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 2c95bf399a5..b53c4ee3374 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -16,12 +16,13 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
@@ -30,8 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,302 +63,403 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
-
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    // The timeout for the consumer to poll records from the remote log 
metadata topic.
+    private final long pollTimeoutMs;
     private final Time time;
 
-    // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
-    // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
-    // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    // It indicates whether the ConsumerTask is closed or not.
+    private volatile boolean isClosed = false;
+    // It indicates whether the user topic partition assignment to the 
consumer has changed or not. If the assignment
+    // has changed, the consumer will eventually start tracking the newly 
assigned partitions and stop tracking the
+    // ones it is no longer assigned to.
+    // The initial value is set to true to wait for partition assignment on 
the first execution; otherwise thread will
+    // be busy without actually doing anything
+    private volatile boolean hasAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt;
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, StartAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean hasLastOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
+    // The interval between retries to fetch the start and end offsets for the 
metadata partitions after a failed fetch.
+    private final long offsetFetchRetryIntervalMs;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Consumer<byte[], byte[]> consumer,
+                        long pollTimeoutMs,
+                        long offsetFetchRetryIntervalMs,
+                        Time time) {
+        this.consumer = consumer;
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.pollTimeoutMs = pollTimeoutMs;
+        this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
         this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
+        this.uninitializedAt = time.milliseconds();
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (hasAssignmentChanged) {
+                    maybeWaitForPartitionAssignments();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            } catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (shouldProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private boolean shouldProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
+    }
+
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
+        maybeFetchStartAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final StartAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.startOffset)) {
+                        markInitialized(utp);
                     } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+                        log.debug("The user-topic-partition {} could not be 
marked initialized since the read-offset is {} " +
+                                "but the end-offset is {} for the 
metadata-partition {}", utp, readOffset, holder.endOffset,
+                            metadataPartition);
                     }
+                } else {
+                    log.debug("The offset-holder is null for the 
metadata-partition {}. The consumer may not have picked" +
+                            " up the recent assignment", metadataPartition);
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isAssigned && 
utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionAssignments() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && hasAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                hasAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            Set<TopicIdPartition> processedAssignmentPartitions = new 
HashSet<>();
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+                processedAssignmentPartitions.add(utp.topicIdPartition);
+            });
+            processedAssignmentOfUserTopicIdPartitions = new 
HashSet<>(processedAssignmentPartitions);
+            
clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchStartAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<TopicIdPartition> 
assignedPartitions) {
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
+    }
+
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                // Make a copy of the existing assignments and update the copy.
+                final Map<TopicIdPartition, UserTopicIdPartition> 
updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(updatedUserPartitions::remove);
+                if 
(!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(updatedUserPartitions);
+                    hasAssignmentChanged = true;
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public Optional<Long> readOffsetForMetadataPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
+            }
         }
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
+
+    private void fetchStartAndEndOffsets() {
+        try {
+            final Set<TopicPartition> uninitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
+                .filter(utp -> utp.isAssigned && !utp.isInitialized)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            // Removing the previous offset holder if it exists. During 
reassignment, if the list-offset
+            // call to `earliest` and `latest` offset fails, then we should 
not use the previous values.
+            uninitializedPartitions.forEach(tp -> 
offsetHolderByMetadataPartition.remove(tp));
+            if (!uninitializedPartitions.isEmpty()) {
+                Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(uninitializedPartitions);
+                Map<TopicPartition, Long> startOffsets = 
consumer.beginningOffsets(uninitializedPartitions);
+                offsetHolderByMetadataPartition = endOffsets.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new 
StartAndEndOffsetHolder(startOffsets.get(e.getKey()), e.getValue())));
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
             }
+            hasLastOffsetsFetchFailed = false;
+        } catch (final RetriableException ex) {
+            // ignore LEADER_NOT_AVAILABLE error, this can happen when the 
partition leader is not yet assigned.
+            hasLastOffsetsFetchFailed = true;
+            lastFailedFetchOffsetsTimestamp = time.milliseconds();
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    private void maybeFetchStartAndEndOffsets() {
+        // If the leader for a `__remote_log_metadata` partition is not 
available, then the call to `ListOffsets`
+        // will fail after the default timeout of 1 min. Added a delay between 
the retries to prevent the thread from
+        // aggressively fetching the list offsets. During this time, the 
recently reassigned user-topic-partitions
+        // won't be marked as initialized.
+        if (hasLastOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 
offsetFetchRetryIntervalMs < time.milliseconds()) {
+            fetchStartAndEndOffsets();
+        }
+    }
 
-            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{
-                assignedMetaPartitions = 
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
-                log.debug("Assigned metadata topic partitions: {}", 
assignedMetaPartitions);
+    private UserTopicIdPartition newUserTopicIdPartition(final 
TopicIdPartition tpId) {
+        return new UserTopicIdPartition(tpId, 
topicPartitioner.metadataPartition(tpId));
+    }
 
-                assignPartitions = true;
-                assignPartitionsLock.notifyAll();
-            } else {
-                log.debug("No change in assigned metadata topic partitions: 
{}", assignedMetaPartitions);
-            }
+    private void markInitialized(final UserTopicIdPartition utp) {
+        // Silently not initialize the utp
+        if (!utp.isAssigned) {
+            log.warn("Tried to initialize a UTP: {} that was not yet 
assigned!", utp);
+            return;
+        }
+        if (!utp.isInitialized) {
+            
remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition);
+            utp.isInitialized = true;
         }
     }
 
-    public Optional<Long> receivedOffsetForPartition(int partition) {
-        return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+    static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer> 
partitions) {
+        return partitions.stream()
+            .map(ConsumerTask::toRemoteLogPartition)
+            .collect(Collectors.toSet());
     }
 
-    public boolean isPartitionAssigned(int partition) {
-        return assignedMetaPartitions.contains(partition);
+    static TopicPartition toRemoteLogPartition(int partition) {
+        return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition);
     }
 
-    public void close() {
-        if (!closing) {
-            synchronized (assignPartitionsLock) {
-                // Closing should be updated only after acquiring the lock to 
avoid race in
-                // maybeWaitForPartitionsAssignment() where it waits on 
assignPartitionsLock. It should not wait
-                // if the closing is already set.
-                closing = true;
-                consumer.wakeup();
-                assignPartitionsLock.notifyAll();
-            }
+    static class UserTopicIdPartition {
+        private final TopicIdPartition topicIdPartition;
+        private final Integer metadataPartition;
+        // The `utp` will be initialized once it reads all the existing events 
from the remote log metadata topic.
+        boolean isInitialized;
+        // denotes whether this `utp` is assigned to the consumer
+        boolean isAssigned;
+
+        /**
+         * UserTopicIdPartition denotes the user topic-partitions for which 
this broker acts as a leader/follower of.
+         *
+         * @param tpId               the unique topic partition identifier
+         * @param metadataPartition  the remote log metadata partition mapped 
for this user-topic-partition.
+         */
+        public UserTopicIdPartition(final TopicIdPartition tpId, final Integer 
metadataPartition) {
+            this.topicIdPartition = Objects.requireNonNull(tpId);
+            this.metadataPartition = Objects.requireNonNull(metadataPartition);
+            this.isInitialized = false;
+            this.isAssigned = false;
+        }
+
+        @Override
+        public String toString() {
+            return "UserTopicIdPartition{" +
+                "topicIdPartition=" + topicIdPartition +
+                ", metadataPartition=" + metadataPartition +
+                ", isInitialized=" + isInitialized +
+                ", isAssigned=" + isAssigned +
+                '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            UserTopicIdPartition that = (UserTopicIdPartition) o;
+            return topicIdPartition.equals(that.topicIdPartition) && 
metadataPartition.equals(that.metadataPartition);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(topicIdPartition, metadataPartition);
         }
     }
 
-    public Set<Integer> metadataPartitionsAssigned() {
-        return Collections.unmodifiableSet(assignedMetaPartitions);
+    static class StartAndEndOffsetHolder {
+        Long startOffset;
+        Long endOffset;
+
+        public StartAndEndOffsetHolder(Long startOffset, Long endOffset) {
+            this.startOffset = startOffset;
+            this.endOffset = endOffset;
+        }
+
+        @Override
+        public String toString() {
+            return "StartAndEndOffsetHolder{" +
+                "startOffset=" + startOffset +
+                ", endOffset=" + endOffset +
+                '}';
+        }
     }
-}
+}
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 8c89df3df2c..758a024e25c 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -32,6 +32,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 
 /**
  * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
@@ -104,6 +105,16 @@ public class RemoteLogMetadataCache {
     // https://issues.apache.org/jira/browse/KAFKA-12641
     protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState> 
leaderEpochEntries = new ConcurrentHashMap<>();
 
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+    public void markInitialized() {
+        initializedLatch.countDown();
+    }
+
+    public boolean isInitialized() {
+        return initializedLatch.getCount() == 0;
+    }
+
     /**
      * Returns {@link RemoteLogSegmentMetadata} if it exists for the given 
leader-epoch containing the offset and with
      * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns 
{@link Optional#empty()}.
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
index c92a51ecaca..f4f43b0d883 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
@@ -50,4 +50,9 @@ public abstract class RemotePartitionMetadataEventHandler {
 
     public abstract void clearTopicPartition(TopicIdPartition 
topicIdPartition);
 
+    public abstract void markInitialized(TopicIdPartition partition);
+
+    public abstract boolean isInitialized(TopicIdPartition partition);
+
+    public abstract void maybeLoadPartition(TopicIdPartition partition);
 }
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index 7051d184aad..f9394eee99f 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
@@ -151,6 +152,12 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
             throw new RemoteResourceNotFoundException("No resource found for 
partition: " + topicIdPartition);
         }
 
+        if (!remoteLogMetadataCache.isInitialized()) {
+            // Throwing a retriable ReplicaNotAvailableException here for 
clients retry. We can introduce a new more
+            // appropriate exception with a KIP in the future.
+            throw new ReplicaNotAvailableException("Remote log metadata cache 
is not initialized for partition: " + topicIdPartition);
+        }
+
         return remoteLogMetadataCache;
     }
 
@@ -180,9 +187,21 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
         idToRemoteLogMetadataCache = Collections.emptyMap();
     }
 
+    @Override
     public void maybeLoadPartition(TopicIdPartition partition) {
         idToRemoteLogMetadataCache.computeIfAbsent(partition,
             topicIdPartition -> new 
FileBasedRemoteLogMetadataCache(topicIdPartition, 
partitionLogDirectory(topicIdPartition.topicPartition())));
     }
 
+    @Override
+    public void markInitialized(TopicIdPartition partition) {
+        idToRemoteLogMetadataCache.get(partition).markInitialized();
+        log.trace("Remote log components are initialized for user-partition: 
{}", partition);
+    }
+
+    @Override
+    public boolean isInitialized(TopicIdPartition topicIdPartition) {
+        RemoteLogMetadataCache metadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
+        return metadataCache != null && metadataCache.isInitialized();
+    }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 4b02b9b6763..e1bf145bbd8 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -84,7 +84,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
 
     private RemotePartitionMetadataStore remotePartitionMetadataStore;
     private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
-    private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+    private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
     private final Set<TopicIdPartition> pendingAssignPartitions = 
Collections.synchronizedSet(new HashSet<>());
     private volatile boolean initializationFailed;
 
@@ -260,12 +260,12 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
     }
 
     public int metadataPartition(TopicIdPartition topicIdPartition) {
-        return rlmmTopicPartitioner.metadataPartition(topicIdPartition);
+        return rlmTopicPartitioner.metadataPartition(topicIdPartition);
     }
 
     // Visible For Testing
-    public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
-        return consumerManager.receivedOffsetForPartition(metadataPartition);
+    public Optional<Long> readOffsetForPartition(int metadataPartition) {
+        return consumerManager.readOffsetForPartition(metadataPartition);
     }
 
     @Override
@@ -357,7 +357,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
             log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
-            rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+            rlmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
             remotePartitionMetadataStore = new 
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
             configured = true;
             log.info("Successfully configured topic-based RLMM with config: 
{}", rlmmConfig);
@@ -416,8 +416,8 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
                 // Create producer and consumer managers.
                 lock.writeLock().lock();
                 try {
-                    producerManager = new ProducerManager(rlmmConfig, 
rlmmTopicPartitioner);
-                    consumerManager = new ConsumerManager(rlmmConfig, 
remotePartitionMetadataStore, rlmmTopicPartitioner, time);
+                    producerManager = new ProducerManager(rlmmConfig, 
rlmTopicPartitioner);
+                    consumerManager = new ConsumerManager(rlmmConfig, 
remotePartitionMetadataStore, rlmTopicPartitioner, time);
                     if (startConsumerThread) {
                         consumerManager.startConsumerThread();
                     } else {
@@ -509,10 +509,8 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
     }
 
     // Visible for testing.
-    public void startConsumerThread() {
-        if (consumerManager != null) {
-            consumerManager.startConsumerThread();
-        }
+    void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner 
rlmTopicPartitioner) {
+        this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner);
     }
 
     @Override
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
new file mode 100644
index 00000000000..2b36c4bb039
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.SystemTime;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 
300_000L, new SystemTime());
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+            thread.join(10_000);
+            assertFalse(thread.isAlive(), "Consumer task thread is still 
alive");
+        }
+    }
+
+    /**
+     * Tests that the consumer task shuts down gracefully when there were no 
assignments.
+     */
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+        assertDoesNotThrow(() -> consumerTask.close(), "Close method threw 
exception");
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
+        addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
+            "Couldn't read record");
+
+        final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
+        consumerTask.removeAssignmentsForPartitions(removePartitions);
+        for (final TopicIdPartition idPartition : allPartitions) {
+            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+            TestUtils.waitForCondition(condition, "Timed out waiting for " + 
idPartition + " to be removed");
+        }
+        for (TopicIdPartition removePartition : removePartitions) {
+            TestUtils.waitForCondition(() -> 
handler.isPartitionCleared.containsKey(removePartition),
+                "Timed out waiting for " + removePartition + " to be cleared");
+        }
+    }
+
+    @Test
+    public void testConcurrentPartitionAssignments() throws 
InterruptedException, ExecutionException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
100);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+
+        final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        Thread assignor = new Thread(() -> {
+            int partitionsAssigned = 0;
+            for (TopicIdPartition partition : allPartitions) {
+                if (partitionsAssigned == 50) {
+                    // Once half the topic partitions are assigned, wait for 
the consumer to catch up. This ensures
+                    // that the consumer is already running when the rest of 
the partitions are assigned.
+                    try {
+                        latch.await(1, TimeUnit.MINUTES);
+                    } catch (InterruptedException e) {
+                        fail(e.getMessage());
+                    }
+                }
+                
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
+                partitionsAssigned++;
+            }
+            isAllPartitionsAssigned.set(true);
+        });
+        Runnable consumerRunnable = () -> {
+            try {
+                while (!isAllPartitionsAssigned.get()) {
+                    consumerTask.maybeWaitForPartitionAssignments();
+                    latch.countDown();
+                }
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        };
+
+        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
+        Future<?> future = consumerExecutor.submit(consumerRunnable);
+        assignor.start();
+
+        assignor.join();
+        future.get();
+    }
+
+    @Test
+    public void testCanProcessRecord() throws InterruptedException {
+        final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+        final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
+        final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 2));
+        assertEquals(partitioner.metadataPartition(tpId0), 
partitioner.metadataPartition(tpId1));
+        assertEquals(partitioner.metadataPartition(tpId0), 
partitioner.metadataPartition(tpId2));
+
+        final int metadataPartition = partitioner.metadataPartition(tpId0);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 0L));
+        final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
+        consumerTask.addAssignmentsForPartitions(assignments);
+        thread.start();
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + 
" to be assigned");
+
+        addRecord(consumer, metadataPartition, tpId0, 0);
+        addRecord(consumer, metadataPartition, tpId0, 1);
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
+
+        // should only read the tpId1 records
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
+        addRecord(consumer, metadataPartition, tpId1, 2);
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)),
 "Couldn't read record");
+        assertEquals(3, handler.metadataCounter);
+
+        // shouldn't read tpId2 records because it's not assigned
+        addRecord(consumer, metadataPartition, tpId2, 3);
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)),
 "Couldn't read record");
+        assertEquals(3, handler.metadataCounter);
+    }
+
+    @Test
+    public void testMaybeMarkUserPartitionsAsReady() throws 
InterruptedException {
+        final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
+        final int metadataPartition = partitioner.metadataPartition(tpId);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 2L));
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+        thread.start();
+
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+        assertFalse(handler.isPartitionInitialized.containsKey(tpId));
+        IntStream.range(0, 5).forEach(offset -> addRecord(consumer, 
metadataPartition, tpId, offset));
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
 "Couldn't read record");
+        assertTrue(handler.isPartitionInitialized.get(tpId));
+    }
+
+    @ParameterizedTest
+    @CsvSource(value = {"0, 0", "500, 500"})
+    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long 
beginOffset,
+                                                                  long 
endOffset) throws InterruptedException {
+        final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+        final int metadataPartition = partitioner.metadataPartition(tpId);
+        
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 beginOffset));
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 endOffset));
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+        thread.start();
+
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+        TestUtils.waitForCondition(() -> 
handler.isPartitionInitialized.containsKey(tpId),
+            "should have initialized the partition");
+        
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
+    }
+
+    @Test
+    public void testConcurrentAccess() throws InterruptedException {
+        thread.start();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)),
 0L));
+        final Thread assignmentThread = new Thread(() -> {
+            try {
+                latch.await();
+                
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+            } catch (final InterruptedException e) {
+                fail("Shouldn't have thrown an exception");
+            }
+        });
+        final Thread closeThread = new Thread(() -> {
+            try {
+                latch.await();
+                consumerTask.close();
+            } catch (final InterruptedException e) {
+                fail("Shouldn't have thrown an exception");
+            }
+        });
+        assignmentThread.start();
+        closeThread.start();
+
+        latch.countDown();
+        assignmentThread.join();
+        closeThread.join();
+    }
+
+    @Test
+    public void testConsumerShouldNotCloseOnRetriableError() throws 
InterruptedException {
+        final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+        final int metadataPartition = partitioner.metadataPartition(tpId);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 1L));
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+        thread.start();
+
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+
+        consumer.setPollException(new LeaderNotAvailableException("leader not 
available!"));
+        addRecord(consumer, metadataPartition, tpId, 0);
+        consumer.setPollException(new TimeoutException("Not able to complete 
the operation within the timeout"));
+        addRecord(consumer, metadataPartition, tpId, 1);
+
+        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
+    }
+
+    @Test
+    public void testConsumerShouldCloseOnNonRetriableError() throws 
InterruptedException {
+        final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+        final int metadataPartition = partitioner.metadataPartition(tpId);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 1L));
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+        thread.start();
+
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be 
assigned");
+        
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+
+        consumer.setPollException(new AuthorizationException("Unauthorized to 
read the topic!"));
+        TestUtils.waitForCondition(() -> consumer.closed(), "Should close the 
consume on non-retriable error");
+    }
+
+    private void addRecord(final MockConsumer<byte[], byte[]> consumer,
+                           final int metadataPartition,
+                           final TopicIdPartition idPartition,
+                           final long recordOffset) {
+        final RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(idPartition, Uuid.randomUuid());
+        final RemoteLogMetadata metadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1, 
Collections.singletonMap(0, 0L));
+        final ConsumerRecord<byte[], byte[]> record = new 
ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
 metadataPartition, recordOffset, null, serde.serialize(metadata));
+        consumer.addRecord(record);
+    }
+
+    private List<TopicIdPartition> getIdPartitions(final String topic, final 
int partitionCount) {
+        final List<TopicIdPartition> idPartitions = new ArrayList<>();
+        for (int partition = 0; partition < partitionCount; partition++) {
+            idPartitions.add(new TopicIdPartition(topicId, new 
TopicPartition(topic, partition)));
+        }
+        return idPartitions;
+    }
+
+    private static class DummyEventHandler extends 
RemotePartitionMetadataEventHandler {
+        private int metadataCounter = 0;
+        private final Map<TopicIdPartition, Boolean> isPartitionInitialized = 
new HashMap<>();
+        private final Map<TopicIdPartition, Boolean> isPartitionLoaded = new 
HashMap<>();
+        private final Map<TopicIdPartition, Boolean> isPartitionCleared = new 
HashMap<>();
+
+        @Override
+        protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+            metadataCounter++;
+        }
+
+        @Override
+        protected void 
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate 
remoteLogSegmentMetadataUpdate) {
+        }
+
+        @Override
+        protected void 
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata) {
+        }
+
+        @Override
+        public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition, 
int metadataPartition, Long metadataPartitionOffset) {
+        }
+
+        @Override
+        public void clearTopicPartition(TopicIdPartition topicIdPartition) {
+            isPartitionCleared.put(topicIdPartition, true);
+        }
+
+        @Override
+        public void markInitialized(TopicIdPartition partition) {
+            isPartitionInitialized.put(partition, true);
+        }
+
+        @Override
+        public boolean isInitialized(TopicIdPartition partition) {
+            return true;
+        }
+
+        @Override
+        public void maybeLoadPartition(TopicIdPartition partition) {
+            isPartitionLoaded.put(partition, true);
+        }
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index e39d872744a..abad6ea7676 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -63,11 +63,12 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
         // Call setup to start the cluster.
         super.setUp(new EmptyTestInfo());
 
-        initializeRemoteLogMetadataManager(topicIdPartitions, 
startConsumerThread);
+        initializeRemoteLogMetadataManager(topicIdPartitions, 
startConsumerThread, null);
     }
 
     public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> 
topicIdPartitions,
-                                                   boolean 
startConsumerThread) {
+                                                   boolean startConsumerThread,
+                                                   
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
         String logDir = 
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
         topicBasedRemoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManager(startConsumerThread) {
             @Override
@@ -104,6 +105,9 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
         log.debug("TopicBasedRemoteLogMetadataManager configs after adding 
overridden properties: {}", configs);
 
         topicBasedRemoteLogMetadataManager.configure(configs);
+        if (remoteLogMetadataTopicPartitioner != null) {
+            
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
+        }
         try {
             waitUntilInitialized(60_000);
         } catch (TimeoutException e) {
@@ -145,4 +149,4 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
     public void closeRemoteLogMetadataManager() {
         Utils.closeQuietly(topicBasedRemoteLogMetadataManager, 
"TopicBasedRemoteLogMetadataManager");
     }
-}
\ No newline at end of file
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
new file mode 100644
index 00000000000..3386b94f895
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+    private TopicBasedRemoteLogMetadataManager rlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster only.
+        remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        remoteLogMetadataManagerHarness.close();
+    }
+
+    @Test
+    public void testMultiplePartitionSubscriptions() throws Exception {
+        // Create topics.
+        String leaderTopic = "leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+            JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String followerTopic = "follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String topicWithNoMessages = "no-messages-topic";
+        HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
+        List<Object> noMessagesTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        noMessagesTopicReplicas.add(1);
+        noMessagesTopicReplicas.add(2);
+        noMessagesTopicReplicas.add(0);
+        assignedTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            topicWithNoMessages, 
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        final TopicIdPartition emptyTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
+
+        RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(10) {
+            @Override
+            public int metadataPartition(TopicIdPartition topicIdPartition) {
+                // Always return partition 0 except for 
noMessagesTopicIdPartition. So that, any new user
+                // partition(other than noMessagesTopicIdPartition) added to 
RLMM will use the same metadata partition.
+                // That will make the secondary consumer assignment.
+                if (emptyTopicIdPartition.equals(topicIdPartition)) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        };
+
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 true, partitioner);
+
+        // Add segments for these partitions but an exception is received as 
they have not yet been subscribed.
+        // These messages would have been published to the respective metadata 
topic partitions but the ConsumerManager
+        // has not yet been subscribing as they are not yet registered.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        ExecutionException exception = 
Assertions.assertThrows(ExecutionException.class, () -> 
rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Partitions currently 
assigned: []",
+            exception.getMessage());
+
+        RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        exception = Assertions.assertThrows(ExecutionException.class, () -> 
rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
+        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This 
consumer is not assigned to the target partition 0. Partitions currently 
assigned: []",
+            exception.getMessage());
+
+        // `listRemoteLogSegments` will receive an exception as these topic 
partitions are not yet registered.
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(leaderTopicIdPartition));
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+        
rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
+            Collections.emptySet());
+
+        // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
+        // fetching those events and build the cache.
+        waitUntilConsumerCatchesUp(30_000L);
+        // leader partitions would have received as it is registered, but 
follower partition is not yet registered,
+        // hence it throws an exception.
+        
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+        // Register follower partition
+        
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
+            Collections.singleton(followerTopicIdPartition));
+
+        // In this state, all the metadata should be available in RLMM for 
both leader and follower partitions.
+        TestUtils.waitForCondition(() -> 
rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments 
found");
+        TestUtils.waitForCondition(() -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments 
found");
+    }
+
+    private void waitUntilConsumerCatchesUp(long timeoutMs) throws 
TimeoutException, InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            // If both the leader and follower partitions are mapped to the 
same metadata partition which is 0, it
+            // should have at least 2 messages. That means, read offset should 
be >= 1 (including duplicate messages if any).
+            return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1;
+        }, timeoutMs, "Consumer did not catch up");
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 714405a30e9..46ffeba2142 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -31,18 +31,14 @@ import org.junit.jupiter.api.Test;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
-import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
@@ -69,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
     }
 
     private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread) {
-        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread);
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread, null);
     }
 
     @AfterEach
@@ -136,9 +132,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
         // Stop TopicBasedRemoteLogMetadataManager only.
         stopTopicBasedRemoteLogMetadataManagerHarness();
 
-        // Start TopicBasedRemoteLogMetadataManager but do not start consumer 
thread to check whether the stored metadata is
-        // loaded successfully or not.
-        startTopicBasedRemoteLogMetadataManagerHarness(false);
+        // Start TopicBasedRemoteLogMetadataManager
+        startTopicBasedRemoteLogMetadataManagerHarness(true);
 
         // Register these partitions to RLMM, which loads the respective 
metadata snapshots.
         
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
 Collections.singleton(followerTopicIdPartition));
@@ -148,29 +143,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest 
{
                                                                  
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
         
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
                                                                  
topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)));
-        // Check whether the check-pointed consumer offsets are stored or not.
-        Path committedOffsetsPath = new File(logDir, 
COMMITTED_OFFSETS_FILE_NAME).toPath();
-        Assertions.assertTrue(committedOffsetsPath.toFile().exists());
-        CommittedOffsetsFile committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-
-        int metadataPartition1 = 
topicBasedRlmm().metadataPartition(leaderTopicIdPartition);
-        int metadataPartition2 = 
topicBasedRlmm().metadataPartition(followerTopicIdPartition);
-        Optional<Long> receivedOffsetForPartition1 = 
topicBasedRlmm().receivedOffsetForPartition(metadataPartition1);
-        Optional<Long> receivedOffsetForPartition2 = 
topicBasedRlmm().receivedOffsetForPartition(metadataPartition2);
-        Assertions.assertTrue(receivedOffsetForPartition1.isPresent());
-        Assertions.assertTrue(receivedOffsetForPartition2.isPresent());
-
-        // Make sure these offsets are at least 0.
-        Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0);
-        Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0);
-
-        // Check the stored entries and the offsets that were set on consumer 
are the same.
-        Map<Integer, Long> partitionToOffset = 
committedOffsetsFile.readEntries();
-        Assertions.assertEquals(partitionToOffset.get(metadataPartition1), 
receivedOffsetForPartition1.get());
-        Assertions.assertEquals(partitionToOffset.get(metadataPartition2), 
receivedOffsetForPartition2.get());
-
-        // Start Consumer thread
-        topicBasedRlmm().startConsumerThread();
 
         // Add one more segment
         RemoteLogSegmentMetadata leaderSegmentMetadata2 = new 
RemoteLogSegmentMetadata(
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index eaf62edfea7..96e48de8a73 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -149,17 +149,17 @@ public class TopicBasedRemoteLogMetadataManagerTest {
             }
 
             // If both the leader and follower partitions are mapped to the 
same metadata partition then it should have at least
-            // 2 messages. That means, received offset should be >= 1 
(including duplicate messages if any).
+            // 2 messages. That means, read offset should be >= 1 (including 
duplicate messages if any).
             if (leaderMetadataPartition == followerMetadataPartition) {
-                if 
(topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L)
 >= 1) {
+                if 
(topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) 
>= 1) {
                     break;
                 }
             } else {
                 // If the leader partition and the follower partition are 
mapped to different metadata partitions then
-                // each of those metadata partitions will have at least 1 
message. That means, received offset should
+                // each of those metadata partitions will have at least 1 
message. That means, read offset should
                 // be >= 0 (including duplicate messages if any).
-                if 
(topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L)
 >= 0 ||
-                        
topicBasedRlmm().receivedOffsetForPartition(followerMetadataPartition).orElse(-1L)
 >= 0) {
+                if 
(topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) 
>= 0 ||
+                        
topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) 
>= 0) {
                     break;
                 }
             }


Reply via email to