showuon commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1283916591


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;

Review Comment:
   Could you explain why we default set to `true` here? I think we always need 
to wait for the `updateAssignment` to be invoked to update the 
`assignedUserTopicIdPartitions` and `isAssignmentChanged`, so that we can 
release the wait in `maybeWaitForPartitionsAssignment`. So, I think it doesn't 
make sense to default set `isAssignmentChanged` to true. WDYT?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            }  catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
-                    } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     }
                 }

Review Comment:
   Some debug logs should be added here for troubleshooting, ex: why the topic 
partition never get initialized?
   So, I suggest:
   1. log debug when holder is null
   2. log debug when holder is not null, but it still cannot mark as 
initialized, ex: `cannot mark initialized since the readOffset is {}, but 
endoffset is {}.` 



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            }  catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
-                    } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     }
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionsAssignment() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && isAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                isAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+            });
+            processedAssignmentOfUserTopicIdPartitions = 
assignedUserTopicIdPartitionsSnapshot.stream()
+                .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+            
clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchBeginAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> 
assignedUTPs) {
+        Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream()
+            .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
+    }
+
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                final Map<TopicIdPartition, UserTopicIdPartition> 
idealUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
idealUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(idealUserPartitions::remove);
+                if 
(!idealUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(idealUserPartitions);
+                    isAssignmentChanged = true;
+                }
+                if (isAssignmentChanged) {
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public Optional<Long> receivedOffsetForPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
+            }
         }
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
+
+    private void fetchBeginAndEndOffsets() {
+        try {
+            final Set<TopicPartition> unInitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
+                .filter(utp -> utp.isAssigned && !utp.isInitialized)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            // Removing the previous offset holder if it exists. During 
reassignment, if the list-offset
+            // call to `earliest` and `latest` offset fails, then we should 
not use the previous values.
+            unInitializedPartitions.forEach(x -> 
offsetHolderByMetadataPartition.remove(x));
+            if (!unInitializedPartitions.isEmpty()) {
+                Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(unInitializedPartitions);
+                Map<TopicPartition, Long> beginOffsets = 
consumer.beginningOffsets(unInitializedPartitions);
+                offsetHolderByMetadataPartition = endOffsets.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new 
BeginAndEndOffsetHolder(beginOffsets.get(e.getKey()), e.getValue())));
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
             }
+            isOffsetsFetchFailed = false;
+        } catch (final RetriableException ex) {
+            // ignore LEADER_NOT_AVAILABLE error, this can happen when the 
partition leader is not yet assigned.
+            isOffsetsFetchFailed = true;
+            lastFailedFetchOffsetsTimestamp = time.milliseconds();
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    private void maybeFetchBeginAndEndOffsets() {
+        // If the leader for a `__remote_log_metadata` partition is not 
available, then the call to `ListOffsets`
+        // will fail after the default timeout of 1 min. Added a delay of 5 
min in between the retries to prevent the
+        // thread from aggressively fetching the list offsets. During this 
time, the recently reassigned
+        // user-topic-partitions won't be marked as initialized.
+        if (isOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 300_000 
< time.milliseconds()) {
+            fetchBeginAndEndOffsets();
+        }
+    }
 
-            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{
-                assignedMetaPartitions = 
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
-                log.debug("Assigned metadata topic partitions: {}", 
assignedMetaPartitions);
+    private UserTopicIdPartition newUserTopicIdPartition(final 
TopicIdPartition tpId) {
+        return new UserTopicIdPartition(tpId, 
topicPartitioner.metadataPartition(tpId));
+    }
 
-                assignPartitions = true;
-                assignPartitionsLock.notifyAll();
-            } else {
-                log.debug("No change in assigned metadata topic partitions: 
{}", assignedMetaPartitions);
-            }
+    private void markInitialized(final UserTopicIdPartition utp) {
+        // Silently not initialize the utp
+        if (!utp.isAssigned) {
+            log.warn("Tried to initialize a UTP: {} that was not yet 
assigned!", utp);
+            return;
+        }
+        if (!utp.isInitialized) {
+            
remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition);
+            utp.isInitialized = true;
         }
     }
 
-    public Optional<Long> receivedOffsetForPartition(int partition) {
-        return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+    static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer> 
partitions) {
+        return partitions.stream()
+            .map(ConsumerTask::toRemoteLogPartition)
+            .collect(Collectors.toSet());
     }
 
-    public boolean isPartitionAssigned(int partition) {
-        return assignedMetaPartitions.contains(partition);
+    static TopicPartition toRemoteLogPartition(int partition) {
+        return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition);
     }
 
-    public void close() {
-        if (!closing) {
-            synchronized (assignPartitionsLock) {
-                // Closing should be updated only after acquiring the lock to 
avoid race in
-                // maybeWaitForPartitionsAssignment() where it waits on 
assignPartitionsLock. It should not wait
-                // if the closing is already set.
-                closing = true;
-                consumer.wakeup();
-                assignPartitionsLock.notifyAll();
-            }
+    static class UserTopicIdPartition {
+        private final TopicIdPartition topicIdPartition;
+        private final Integer metadataPartition;
+        // The `utp` will be initialized once it reads all the existing events 
from the remote log metadata topic.
+        boolean isInitialized;
+        // denotes whether this `utp` is assigned to the consumer
+        boolean isAssigned;
+
+        /**
+         * UserTopicIdPartition denotes the user topic-partitions for which 
this broker acts as a leader/follower of.
+         *
+         * @param tpId               the unique topic partition identifier
+         * @param metadataPartition  the remote log metadata partition mapped 
for this user-topic-partition.
+         */
+        public UserTopicIdPartition(final TopicIdPartition tpId, final Integer 
metadataPartition) {
+            this.topicIdPartition = Objects.requireNonNull(tpId);
+            this.metadataPartition = Objects.requireNonNull(metadataPartition);
+            this.isInitialized = false;
+            this.isAssigned = false;
+        }
+
+        @Override
+        public String toString() {
+            return "UserTopicIdPartition{" +
+                "topicIdPartition=" + topicIdPartition +
+                ", metadataPartition=" + metadataPartition +
+                ", isInitialized=" + isInitialized +
+                '}';

Review Comment:
   Should we output `isAssigned`?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            }  catch (final Exception ex) {
+                isClosed = true;
+                log.error("Error occurred while processing the records", ex);
             }
-        } catch (Exception e) {
-            log.error("Error occurred in consumer task, close:[{}]", closing, 
e);
-        } finally {
-            maybeSyncCommittedDataAndOffsets(true);
-            closeConsumer();
-            log.info("Exiting from consumer task thread");
         }
+        try {
+            consumer.close(Duration.ofSeconds(30));
+        } catch (final Exception e) {
+            log.error("Error encountered while closing the consumer", e);
+        }
+        log.info("Exited from consumer task thread");
     }
 
     private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
-        // Taking assignPartitionsLock here as updateAssignmentsForPartitions 
changes assignedTopicPartitions
-        // and also calls 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for 
the removed
-        // partitions.
-        RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
-        synchronized (assignPartitionsLock) {
-            if 
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-                
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-            } else {
-                log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
-            }
-            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
-            partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+        final RemoteLogMetadata remoteLogMetadata = 
serde.deserialize(record.value());
+        if (canProcess(remoteLogMetadata, record.offset())) {
+            
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), 
record.offset());
+        } else {
+            log.debug("The event {} is skipped because it is either already 
processed or not assigned to this consumer", remoteLogMetadata);
         }
+        log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
+        readOffsetsByMetadataPartition.put(record.partition(), 
record.offset());
+    }
+
+    private boolean canProcess(final RemoteLogMetadata metadata, final long 
recordOffset) {
+        final TopicIdPartition tpId = metadata.topicIdPartition();
+        final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+        return processedAssignmentOfUserTopicIdPartitions.contains(tpId) && 
(readOffset == null || readOffset < recordOffset);
     }
 
-    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
-        // Return immediately if there is no consumption from last time.
-        boolean noConsumedOffsetUpdates = 
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
-        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - 
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
-            log.debug("Skip syncing committed offsets, 
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, 
forceSync);
+    private void maybeMarkUserPartitionsAsReady() {
+        if (isAllUserTopicPartitionsInitialized) {
             return;
         }
-
-        try {
-            // Need to take lock on assignPartitionsLock as 
assignedTopicPartitions might
-            // get updated by other threads.
-            synchronized (assignPartitionsLock) {
-                for (TopicIdPartition topicIdPartition : 
assignedTopicPartitions) {
-                    int metadataPartition = 
topicPartitioner.metadataPartition(topicIdPartition);
-                    Long offset = 
partitionToConsumedOffsets.get(metadataPartition);
-                    if (offset != null) {
-                        
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
-                    } else {
-                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
-                                topicIdPartition, metadataPartition);
+        maybeFetchBeginAndEndOffsets();
+        boolean isAllInitialized = true;
+        for (final UserTopicIdPartition utp : 
assignedUserTopicIdPartitions.values()) {
+            if (utp.isAssigned && !utp.isInitialized) {
+                final Integer metadataPartition = utp.metadataPartition;
+                final BeginAndEndOffsetHolder holder = 
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+                // The offset-holder can be null, when the recent assignment 
wasn't picked up by the consumer.
+                if (holder != null) {
+                    final Long readOffset = 
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+                    // 1) The end-offset was fetched only once during 
reassignment. The metadata-partition can receive
+                    // new stream of records, so the consumer can read records 
more than the last-fetched end-offset.
+                    // 2) When the internal topic becomes empty due to breach 
by size/time/start-offset, then there
+                    // are no records to read.
+                    if (readOffset + 1 >= holder.endOffset || 
holder.endOffset.equals(holder.beginOffset)) {
+                        markInitialized(utp);
                     }
                 }
-
-                // Write partitionToConsumedOffsets into committed offsets 
file as we do not want to process them again
-                // in case of restarts.
-                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
-                lastSyncedPartitionToConsumedOffsets = new 
HashMap<>(partitionToConsumedOffsets);
             }
-
-            lastSyncedTimeMs = time.milliseconds();
-        } catch (IOException e) {
-            throw new KafkaException("Error encountered while writing 
committed offsets to a local file", e);
+            isAllInitialized = isAllInitialized && utp.isInitialized;
         }
-    }
-
-    private void closeConsumer() {
-        log.info("Closing the consumer instance");
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+        if (isAllInitialized) {
+            log.info("Initialized for all the {} assigned user-partitions 
mapped to the {} meta-partitions in {} ms",
+                assignedUserTopicIdPartitions.size(), 
assignedMetadataPartitions.size(),
+                time.milliseconds() - uninitializedAt);
         }
+        isAllUserTopicPartitionsInitialized = isAllInitialized;
     }
 
-    private void maybeWaitForPartitionsAssignment() {
-        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+    void maybeWaitForPartitionsAssignment() throws InterruptedException {
+        // Snapshots of the metadata-partition and user-topic-partition are 
used to reduce the scope of the
+        // synchronization block.
+        // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the 
user-topic-partitions from the request
+        //    handler threads. Those threads should not be blocked for a long 
time, therefore scope of the
+        //    synchronization block is reduced to bare minimum.
+        // 2) Note that the consumer#position, consumer#seekToBeginning, 
consumer#seekToEnd and the other consumer APIs
+        //    response times are un-predictable. Those should not be kept in 
the synchronization block.
+        final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+        final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot 
= new HashSet<>();
         synchronized (assignPartitionsLock) {
-            // If it is closing, return immediately. This should be inside the 
assignPartitionsLock as the closing is updated
-            // in close() method with in the same lock to avoid any race 
conditions.
-            if (closing) {
-                return;
+            while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+                log.debug("Waiting for remote log metadata partitions to be 
assigned");
+                assignPartitionsLock.wait();
             }
-
-            while (assignedMetaPartitions.isEmpty()) {
-                // If no partitions are assigned, wait until they are assigned.
-                log.debug("Waiting for assigned remote log metadata 
partitions..");
-                try {
-                    // No timeout is set here, as it is always notified. Even 
when it is closed, the race can happen
-                    // between the thread calling this method and the thread 
calling close(). We should have a check
-                    // for closing as that might have been set and notified 
with assignPartitionsLock by `close`
-                    // method.
-                    assignPartitionsLock.wait();
-
-                    if (closing) {
-                        return;
-                    }
-                } catch (InterruptedException e) {
-                    throw new KafkaException(e);
-                }
-            }
-
-            if (assignPartitions) {
-                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
-                // Removing unassigned meta partitions from 
partitionToConsumedOffsets and partitionToCommittedOffsets
-                partitionToConsumedOffsets.entrySet().removeIf(entry -> 
!assignedMetaPartitions.contains(entry.getKey()));
-
-                assignPartitions = false;
+            if (!isClosed && isAssignmentChanged) {
+                assignedUserTopicIdPartitions.values().forEach(utp -> {
+                    metadataPartitionSnapshot.add(utp.metadataPartition);
+                    assignedUserTopicIdPartitionsSnapshot.add(utp);
+                });
+                isAssignmentChanged = false;
             }
         }
-
-        if (!assignedMetaPartitionsSnapshot.isEmpty()) {
-            executeReassignment(assignedMetaPartitionsSnapshot);
+        if (!metadataPartitionSnapshot.isEmpty()) {
+            final Set<TopicPartition> remoteLogPartitions = 
toRemoteLogPartitions(metadataPartitionSnapshot);
+            consumer.assign(remoteLogPartitions);
+            this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
+            // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
+            final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
+                .stream()
+                .filter(utp -> !utp.isAssigned)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            consumer.seekToBeginning(seekToBeginOffsetPartitions);
+            // for other metadata partitions, read from the offset where the 
processing left last time.
+            remoteLogPartitions.stream()
+                .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+                    readOffsetsByMetadataPartition.containsKey(tp.partition()))
+                .forEach(tp -> consumer.seek(tp, 
readOffsetsByMetadataPartition.get(tp.partition())));
+            // mark all the user-topic-partitions as assigned to the consumer.
+            assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+                if (!utp.isAssigned) {
+                    // Note that there can be a race between `remove` and 
`add` partition assignment. Calling the
+                    // `maybeLoadPartition` here again to be sure that the 
partition gets loaded on the handler.
+                    
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+                    utp.isAssigned = true;
+                }
+            });
+            processedAssignmentOfUserTopicIdPartitions = 
assignedUserTopicIdPartitionsSnapshot.stream()
+                .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+            
clearResourcesForUnassignedUserTopicPartitions(assignedUserTopicIdPartitionsSnapshot);
+            isAllUserTopicPartitionsInitialized = false;
+            uninitializedAt = time.milliseconds();
+            fetchBeginAndEndOffsets();
         }
     }
 
-    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions =
-                assignedMetaPartitionsSnapshot.stream()
-                                              .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                                              .collect(Collectors.toSet());
-        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
-        consumer.assign(assignedMetaTopicPartitions);
+    private void 
clearResourcesForUnassignedUserTopicPartitions(Set<UserTopicIdPartition> 
assignedUTPs) {
+        Set<TopicIdPartition> assignedPartitions = assignedUTPs.stream()
+            .map(utp -> utp.topicIdPartition).collect(Collectors.toSet());
+        // Note that there can be previously assigned user-topic-partitions 
where no records are there to read
+        // (eg) none of the segments for a partition were uploaded. Those 
partition resources won't be cleared.
+        // It can be fixed later when required since they are empty resources.
+        Set<TopicIdPartition> unassignedPartitions = 
readOffsetsByUserTopicPartition.keySet()
+            .stream()
+            .filter(e -> !assignedPartitions.contains(e))
+            .collect(Collectors.toSet());
+        unassignedPartitions.forEach(unassignedPartition -> {
+            
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+            readOffsetsByUserTopicPartition.remove(unassignedPartition);
+        });
+        log.info("Unassigned user-topic-partitions: {}", 
unassignedPartitions.size());
+    }
+
+    public void addAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Objects.requireNonNull(partitions), 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(final Set<TopicIdPartition> 
partitions) {
+        updateAssignments(Collections.emptySet(), 
Objects.requireNonNull(partitions));
     }
 
-    public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
-        updateAssignmentsForPartitions(partitions, Collections.emptySet());
+    private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+                                   final Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for partitions added: {} and removed: 
{}", addedPartitions, removedPartitions);
+        if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+            synchronized (assignPartitionsLock) {
+                final Map<TopicIdPartition, UserTopicIdPartition> 
idealUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+                addedPartitions.forEach(tpId -> 
idealUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+                removedPartitions.forEach(idealUserPartitions::remove);
+                if 
(!idealUserPartitions.equals(assignedUserTopicIdPartitions)) {
+                    assignedUserTopicIdPartitions = 
Collections.unmodifiableMap(idealUserPartitions);
+                    isAssignmentChanged = true;
+                }
+                if (isAssignmentChanged) {
+                    log.debug("Assigned user-topic-partitions: {}", 
assignedUserTopicIdPartitions);
+                    assignPartitionsLock.notifyAll();
+                }
+            }
+        }
     }
 
-    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
-        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    public Optional<Long> receivedOffsetForPartition(final int partition) {
+        return 
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
     }
 
-    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
-                                                Set<TopicIdPartition> 
removedPartitions) {
-        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+    public boolean isMetadataPartitionAssigned(final int partition) {
+        return assignedMetadataPartitions.contains(partition);
+    }
 
-        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
-        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+    public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+        final UserTopicIdPartition utp = 
assignedUserTopicIdPartitions.get(partition);
+        return utp != null && utp.isAssigned;
+    }
 
-        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
-            return;
+    @Override
+    public void close() {
+        if (!isClosed) {
+            log.info("Closing the instance");
+            synchronized (assignPartitionsLock) {
+                isClosed = true;
+                
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+                consumer.wakeup();
+                assignPartitionsLock.notifyAll();
+            }
         }
+    }
 
-        synchronized (assignPartitionsLock) {
-            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
-            updatedReassignedPartitions.addAll(addedPartitions);
-            updatedReassignedPartitions.removeAll(removedPartitions);
-            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
-            for (TopicIdPartition tp : updatedReassignedPartitions) {
-                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
-            }
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetadataPartitions);
+    }
+
+    private void fetchBeginAndEndOffsets() {
+        try {
+            final Set<TopicPartition> unInitializedPartitions = 
assignedUserTopicIdPartitions.values().stream()
+                .filter(utp -> utp.isAssigned && !utp.isInitialized)
+                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+                .collect(Collectors.toSet());
+            // Removing the previous offset holder if it exists. During 
reassignment, if the list-offset
+            // call to `earliest` and `latest` offset fails, then we should 
not use the previous values.
+            unInitializedPartitions.forEach(x -> 
offsetHolderByMetadataPartition.remove(x));
+            if (!unInitializedPartitions.isEmpty()) {
+                Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(unInitializedPartitions);
+                Map<TopicPartition, Long> beginOffsets = 
consumer.beginningOffsets(unInitializedPartitions);
+                offsetHolderByMetadataPartition = endOffsets.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new 
BeginAndEndOffsetHolder(beginOffsets.get(e.getKey()), e.getValue())));
 
-            // Clear removed topic partitions from in-memory cache.
-            for (TopicIdPartition removedPartition : removedPartitions) {
-                
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
             }
+            isOffsetsFetchFailed = false;
+        } catch (final RetriableException ex) {
+            // ignore LEADER_NOT_AVAILABLE error, this can happen when the 
partition leader is not yet assigned.
+            isOffsetsFetchFailed = true;
+            lastFailedFetchOffsetsTimestamp = time.milliseconds();
+        }
+    }
 
-            assignedTopicPartitions = 
Collections.unmodifiableSet(updatedReassignedPartitions);
-            log.debug("Assigned topic partitions: {}", 
assignedTopicPartitions);
+    private void maybeFetchBeginAndEndOffsets() {
+        // If the leader for a `__remote_log_metadata` partition is not 
available, then the call to `ListOffsets`
+        // will fail after the default timeout of 1 min. Added a delay of 5 
min in between the retries to prevent the
+        // thread from aggressively fetching the list offsets. During this 
time, the recently reassigned
+        // user-topic-partitions won't be marked as initialized.
+        if (isOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp + 300_000 
< time.milliseconds()) {
+            fetchBeginAndEndOffsets();
+        }

Review Comment:
   The 5 mins wait is too long. Like you said in the comment, timeout is 1 min, 
why can't we retry after 10 secs or so  to have more chance to get the offsets? 
IMO, 10 secs for electing a leader for a partition is long enough, so we should 
reduce this timeout. Thought?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##########
@@ -104,6 +106,22 @@ public class RemoteLogMetadataCache {
     // https://issues.apache.org/jira/browse/KAFKA-12641
     protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState> 
leaderEpochEntries = new ConcurrentHashMap<>();
 
+    private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+    public void markInitialized() {
+        initializedLatch.countDown();
+    }
+
+    public void ensureInitialized() throws InterruptedException {
+        if (!initializedLatch.await(2, TimeUnit.MINUTES)) {

Review Comment:
   I can see we will block for operations like `findOffsetByTimestamp` (invoked 
by clients), and the client request timeout default is 30 secs. I think the 2 
mins wait is not appropriate here. Maybe 10 secs? Or any reason we chose 2 mins?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to