showuon commented on code in PR #14127:
URL: https://github.com/apache/kafka/pull/14127#discussion_r1284009383


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -64,302 +65,387 @@
 class ConsumerTask implements Runnable, Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
 
-    private static final long POLL_INTERVAL_MS = 100L;
+    static long pollIntervalMs = 100L;
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final String metadataTopicName;
+    private final Consumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
-    private final Time time;
+    private final Time time = new SystemTime();
 
+    // TODO - Update comments below
     // It indicates whether the closing process has been started or not. If it 
is set as true,
     // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
-    private volatile boolean closing = false;
-
+    private volatile boolean isClosed = false;
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
     // determined that the consumer needs to be assigned with the updated 
partitions.
-    private volatile boolean assignPartitions = false;
+    private volatile boolean isAssignmentChanged = true;
 
     // It represents a lock for any operations related to the 
assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
-    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+    private volatile Set<Integer> assignedMetadataPartitions = 
Collections.emptySet();
 
     // User topic partitions that this broker is a leader/follower for.
-    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+    private volatile Map<TopicIdPartition, UserTopicIdPartition> 
assignedUserTopicIdPartitions = Collections.emptyMap();
+    private volatile Set<TopicIdPartition> 
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
-    // may or may not have been processed based on the assigned topic 
partitions.
-    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+    private long uninitializedAt = time.milliseconds();
+    private boolean isAllUserTopicPartitionsInitialized;
 
-    // Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
-    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> readOffsetsByMetadataPartition = new 
ConcurrentHashMap<>();
+    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition 
= new HashMap<>();
 
-    private final long committedOffsetSyncIntervalMs;
-    private CommittedOffsetsFile committedOffsetsFile;
-    private long lastSyncedTimeMs;
+    private Map<TopicPartition, BeginAndEndOffsetHolder> 
offsetHolderByMetadataPartition = new HashMap<>();
+    private boolean isOffsetsFetchFailed = false;
+    private long lastFailedFetchOffsetsTimestamp;
 
-    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
-                        String metadataTopicName,
-                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+    public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
-                        Path committedOffsetsPath,
-                        Time time,
-                        long committedOffsetSyncIntervalMs) {
-        this.consumer = Objects.requireNonNull(consumer);
-        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+                        Function<Optional<String>, Consumer<byte[], byte[]>> 
consumerSupplier) {
+        this.consumer = consumerSupplier.apply(Optional.empty());
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
-        this.time = Objects.requireNonNull(time);
-        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
-        initializeConsumerAssignment(committedOffsetsPath);
-    }
-
-    private void initializeConsumerAssignment(Path committedOffsetsPath) {
-        try {
-            committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-
-        Map<Integer, Long> committedOffsets = Collections.emptyMap();
-        try {
-            // Load committed offset and assign them in the consumer.
-            committedOffsets = committedOffsetsFile.readEntries();
-        } catch (IOException e) {
-            // Ignore the error and consumer consumes from the earliest offset.
-            log.error("Encountered error while building committed offsets from 
the file. " +
-                              "Consumer will consume from the earliest offset 
for the assigned partitions.", e);
-        }
-
-        if (!committedOffsets.isEmpty()) {
-            // Assign topic partitions from the earlier committed offsets file.
-            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
-            assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
-            Set<TopicPartition> metadataTopicPartitions = 
earlierAssignedPartitions.stream()
-                                                                               
    .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-                                                                               
    .collect(Collectors.toSet());
-            consumer.assign(metadataTopicPartitions);
-
-            // Seek to the committed offsets
-            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
-                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
-                partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
-                consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
-            }
-
-            lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
-        }
     }
 
     @Override
     public void run() {
-        log.info("Started Consumer task thread.");
-        lastSyncedTimeMs = time.milliseconds();
-        try {
-            while (!closing) {
-                maybeWaitForPartitionsAssignment();
+        log.info("Starting consumer task thread.");
+        while (!isClosed) {
+            try {
+                if (isAssignmentChanged) {
+                    maybeWaitForPartitionsAssignment();
+                }
 
                 log.trace("Polling consumer to receive remote log metadata 
topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                     processConsumerRecord(record);
                 }
-
-                maybeSyncCommittedDataAndOffsets(false);
+                maybeMarkUserPartitionsAsReady();
+            } catch (final WakeupException ex) {
+                // ignore logging the error
+                isClosed = true;
+            } catch (final RetriableException ex) {
+                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
+            }  catch (final Exception ex) {

Review Comment:
   nit: additional space after `}`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), 1000, "");
+        addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(),
+            1000, "Wait for the record to be consumed");
+
+        final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
+        consumerTask.removeAssignmentsForPartitions(removePartitions);
+        for (final TopicIdPartition idPartition : allPartitions) {
+            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+            TestUtils.waitForCondition(condition, 1000, "");
+        }
+        for (TopicIdPartition removePartition : removePartitions) {
+            TestUtils.waitForCondition(() -> 
handler.isPartitionCleared.containsKey(removePartition), 1000, "");
+        }
+    }
+
+    @Test
+    public void testConcurrentPartitionAssignments() throws 
InterruptedException, ExecutionException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
100);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+
+        final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
+        Thread assignor = new Thread(() -> {
+            for (TopicIdPartition partition : allPartitions) {
+                
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
+            }
+            isAllPartitionsAssigned.set(true);
+        });
+        Runnable consumerRunnable = () -> {
+            try {
+                while (!isAllPartitionsAssigned.get()) {
+                    consumerTask.maybeWaitForPartitionsAssignment();
+                }
+            } catch (Exception e) {

Review Comment:
   About this test, could you explain what you're trying to test? concurrent 
partition assignment won't impact `maybeWaitForPartitionsAssignment()`? 
   
   Note: The `assignor` thread might run first (even though you start it later 
than `consumerRunnable`). So this might happen:
   1. assignor completed all partition assignment, and set 
`isAllPartitionsAssigned` to true
   2. `while (!isAllPartitionsAssigned.get()) ` is false, so it won't enter 
`maybeWaitForPartitionsAssignment`
   
   



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), 1000, "");

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), 1000, "");
+        addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(),
+            1000, "Wait for the record to be consumed");
+
+        final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
+        consumerTask.removeAssignmentsForPartitions(removePartitions);
+        for (final TopicIdPartition idPartition : allPartitions) {
+            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+            TestUtils.waitForCondition(condition, 1000, "");
+        }
+        for (TopicIdPartition removePartition : removePartitions) {
+            TestUtils.waitForCondition(() -> 
handler.isPartitionCleared.containsKey(removePartition), 1000, "");
+        }
+    }
+
+    @Test
+    public void testConcurrentPartitionAssignments() throws 
InterruptedException, ExecutionException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
100);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+
+        final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
+        Thread assignor = new Thread(() -> {
+            for (TopicIdPartition partition : allPartitions) {
+                
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
+            }
+            isAllPartitionsAssigned.set(true);
+        });
+        Runnable consumerRunnable = () -> {
+            try {
+                while (!isAllPartitionsAssigned.get()) {
+                    consumerTask.maybeWaitForPartitionsAssignment();
+                }
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        };
+
+        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
+        Future<?> future = consumerExecutor.submit(consumerRunnable);
+        assignor.start();
+
+        assignor.join();
+        future.get();
+    }
+
+    @Test
+    public void testCanProcessRecord() throws InterruptedException {
+        final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+        final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
+        final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 2));
+        assert partitioner.metadataPartition(tpId0) == 
partitioner.metadataPartition(tpId1);
+        assert partitioner.metadataPartition(tpId0) == 
partitioner.metadataPartition(tpId2);

Review Comment:
   We usually won't do assert this this. Could you directly use `assertTrue` 
here?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), 1000, "");
+        addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(),
+            1000, "Wait for the record to be consumed");
+
+        final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
+        consumerTask.removeAssignmentsForPartitions(removePartitions);
+        for (final TopicIdPartition idPartition : allPartitions) {
+            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+            TestUtils.waitForCondition(condition, 1000, "");
+        }
+        for (TopicIdPartition removePartition : removePartitions) {
+            TestUtils.waitForCondition(() -> 
handler.isPartitionCleared.containsKey(removePartition), 1000, "");
+        }
+    }
+
+    @Test
+    public void testConcurrentPartitionAssignments() throws 
InterruptedException, ExecutionException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
100);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+
+        final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
+        Thread assignor = new Thread(() -> {
+            for (TopicIdPartition partition : allPartitions) {
+                
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
+            }
+            isAllPartitionsAssigned.set(true);
+        });
+        Runnable consumerRunnable = () -> {
+            try {
+                while (!isAllPartitionsAssigned.get()) {
+                    consumerTask.maybeWaitForPartitionsAssignment();
+                }
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        };
+
+        ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
+        Future<?> future = consumerExecutor.submit(consumerRunnable);
+        assignor.start();
+
+        assignor.join();
+        future.get();
+    }
+
+    @Test
+    public void testCanProcessRecord() throws InterruptedException {
+        final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+        final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
+        final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 2));
+        assert partitioner.metadataPartition(tpId0) == 
partitioner.metadataPartition(tpId1);
+        assert partitioner.metadataPartition(tpId0) == 
partitioner.metadataPartition(tpId2);
+
+        final int metadataPartition = partitioner.metadataPartition(tpId0);
+        
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 0L));
+        final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
+        consumerTask.addAssignmentsForPartitions(assignments);
+        thread.start();
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), 1000, "");
+
+        addRecord(consumer, metadataPartition, tpId0, 0);
+        addRecord(consumer, metadataPartition, tpId0, 1);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(metadataPartition).equals(Optional.of(1L)),
 1000, "Couldn't read record");
+        assertEquals(2, handler.metadataCounter);
+
+        // should only read the tpId1 records
+        consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), 1000, "");
+        addRecord(consumer, metadataPartition, tpId1, 2);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(metadataPartition).equals(Optional.of(2L)),
 1000, "Couldn't read record");
+        assertEquals(3, handler.metadataCounter);
+
+        // shouldn't read tpId2 records

Review Comment:
   Could you add more comment here to explain why it can't receive records for 
tpId2? I think it's because tpId2 is not assigned, right?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);

Review Comment:
   Could you explain what this test is trying to verify? If we want to verify 
the consumer thread is closed, should we explicitly assert that?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");
+            
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+            assertTrue(handler.isPartitionLoaded.get(idPartition));
+        }
+    }
+
+    @Test
+    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+        final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+        thread.start();
+
+        final TopicIdPartition tpId = allPartitions.get(0);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), 1000, "");
+        addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+        TestUtils.waitForCondition(() -> 
consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(),
+            1000, "Wait for the record to be consumed");

Review Comment:
   Please use default timeout value here and below. That is, removing the 2nd 
parameter. 



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();

Review Comment:
   Could we give it a timeout to avoid blocking forever? 



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+    private final int numMetadataTopicPartitions = 5;
+    private final RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+    private final DummyEventHandler handler = new DummyEventHandler();
+    private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, 
numMetadataTopicPartitions).boxed()
+        .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+    private final Uuid topicId = Uuid.randomUuid();
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    private ConsumerTask consumerTask;
+    private MockConsumer<byte[], byte[]> consumer;
+    private Thread thread;
+
+    @BeforeEach
+    public void beforeEach() {
+        final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+            .collect(Collectors.toMap(Function.identity(), e -> 0L));
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updateBeginningOffsets(offsets);
+        ConsumerTask.pollIntervalMs = 10L;
+        consumerTask = new ConsumerTask(handler, partitioner, ignored -> 
consumer);
+        thread = new Thread(consumerTask);
+    }
+
+    @AfterEach
+    public void afterEach() throws InterruptedException {
+        if (thread != null) {
+            consumerTask.close();
+            thread.join();
+        }
+    }
+
+    @Test
+    public void testCloseOnNoAssignment() throws InterruptedException {
+        thread.start();
+        Thread.sleep(10);
+    }
+
+    @Test
+    public void testIdempotentClose() {
+        thread.start();
+        consumerTask.close();
+        consumerTask.close();
+    }
+
+    @Test
+    public void testUserTopicIdPartitionEquals() {
+        final TopicIdPartition tpId = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, 
partitioner.metadataPartition(tpId));
+        utp1.isInitialized = true;
+        utp1.isAssigned = true;
+
+        assertFalse(utp2.isInitialized);
+        assertFalse(utp2.isAssigned);
+        assertEquals(utp1, utp2);
+    }
+
+    @Test
+    public void testAddAssignmentsForPartitions() throws InterruptedException {
+        final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
+        final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+            .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+            .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
+        consumer.updateEndOffsets(endOffsets);
+        consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+        thread.start();
+        for (final TopicIdPartition idPartition : idPartitions) {
+            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), 1000, "");

Review Comment:
   Why did we only wait for 1 sec, and with empty error message? Maybe we can 
change to :
   `TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");`



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+    private TopicBasedRemoteLogMetadataManager rlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster only.
+        remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        remoteLogMetadataManagerHarness.close();
+    }
+
+    @Test
+    public void testMultiplePartitionSubscriptions() throws Exception {
+        // Create topics.
+        String leaderTopic = "leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+            JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String followerTopic = "follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String topicWithNoMessages = "no-messages-topic";
+        HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
+        List<Object> noMessagesTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        noMessagesTopicReplicas.add(1);
+        noMessagesTopicReplicas.add(2);
+        noMessagesTopicReplicas.add(0);
+        assignedTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            topicWithNoMessages, 
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        final TopicIdPartition emptyTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
+
+        RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(10) {
+            @Override
+            public int metadataPartition(TopicIdPartition topicIdPartition) {
+                // Always return partition 0 except for 
noMessagesTopicIdPartition. So that, any new user
+                // partition(other than noMessagesTopicIdPartition) added to 
RLMM will use the same metadata partition.
+                // That will make the secondary consumer assignment.
+                if (emptyTopicIdPartition.equals(topicIdPartition)) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        };
+
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 true, partitioner);
+
+        // Add segments for these partitions but an exception is received as 
they have not yet been subscribed.
+        // These messages would have been published to the respective metadata 
topic partitions but the ConsumerManager
+        // has not yet been subscribing as they are not yet registered.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        Assertions.assertThrows(Exception.class, () -> 
rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());

Review Comment:
   could you set a specific thrown exception here, instead of the general 
`Exception.class`?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+    private TopicBasedRemoteLogMetadataManager rlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster only.
+        remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        remoteLogMetadataManagerHarness.close();
+    }
+
+    @Test
+    public void testMultiplePartitionSubscriptions() throws Exception {
+        // Create topics.
+        String leaderTopic = "leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+            JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String followerTopic = "follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String topicWithNoMessages = "no-messages-topic";
+        HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
+        List<Object> noMessagesTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        noMessagesTopicReplicas.add(1);
+        noMessagesTopicReplicas.add(2);
+        noMessagesTopicReplicas.add(0);
+        assignedTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            topicWithNoMessages, 
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        final TopicIdPartition emptyTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
+
+        RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(10) {
+            @Override
+            public int metadataPartition(TopicIdPartition topicIdPartition) {
+                // Always return partition 0 except for 
noMessagesTopicIdPartition. So that, any new user
+                // partition(other than noMessagesTopicIdPartition) added to 
RLMM will use the same metadata partition.
+                // That will make the secondary consumer assignment.
+                if (emptyTopicIdPartition.equals(topicIdPartition)) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        };
+
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 true, partitioner);
+
+        // Add segments for these partitions but an exception is received as 
they have not yet been subscribed.
+        // These messages would have been published to the respective metadata 
topic partitions but the ConsumerManager
+        // has not yet been subscribing as they are not yet registered.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        Assertions.assertThrows(Exception.class, () -> 
rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+
+        RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        Assertions.assertThrows(Exception.class, () -> 
rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
+
+        // `listRemoteLogSegments` will receive an exception as these topic 
partitions are not yet registered.
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(leaderTopicIdPartition));
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+        
rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
+            Collections.emptySet());
+
+        // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
+        // fetching those events and build the cache.
+        waitUntilConsumerCatchesUp(30_000L);
+        // leader partitions would have received as it is registered, but 
follower partition is not yet registered,
+        // hence it throws an exception.
+        
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
+        Assertions.assertThrows(RemoteStorageException.class, () -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+        // Register follower partition
+        
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
+            Collections.singleton(followerTopicIdPartition));
+
+        // In this state, all the metadata should be available in RLMM for 
both leader and follower partitions.
+        TestUtils.waitForCondition(() -> 
rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments 
found");
+        TestUtils.waitForCondition(() -> 
rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments 
found");
+    }
+
+    private void waitUntilConsumerCatchesUp(long timeoutMs) throws 
TimeoutException {

Review Comment:
   Why can't we use `TestUtils.waitForCondition` here? 



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final TopicBasedRemoteLogMetadataManagerHarness 
remoteLogMetadataManagerHarness = new 
TopicBasedRemoteLogMetadataManagerHarness();
+
+    private TopicBasedRemoteLogMetadataManager rlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster only.
+        remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        remoteLogMetadataManagerHarness.close();
+    }
+
+    @Test
+    public void testMultiplePartitionSubscriptions() throws Exception {
+        // Create topics.
+        String leaderTopic = "leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new 
HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+            JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String followerTopic = "follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new 
HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            followerTopic, 
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        String topicWithNoMessages = "no-messages-topic";
+        HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
+        List<Object> noMessagesTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        noMessagesTopicReplicas.add(1);
+        noMessagesTopicReplicas.add(2);
+        noMessagesTopicReplicas.add(0);
+        assignedTopicReplicas.put(0, 
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopicWithAssignment(
+            topicWithNoMessages, 
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
+            remoteLogMetadataManagerHarness.listenerName());
+
+        final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+        final TopicIdPartition emptyTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
+
+        RemoteLogMetadataTopicPartitioner partitioner = new 
RemoteLogMetadataTopicPartitioner(10) {
+            @Override
+            public int metadataPartition(TopicIdPartition topicIdPartition) {
+                // Always return partition 0 except for 
noMessagesTopicIdPartition. So that, any new user
+                // partition(other than noMessagesTopicIdPartition) added to 
RLMM will use the same metadata partition.
+                // That will make the secondary consumer assignment.
+                if (emptyTopicIdPartition.equals(topicIdPartition)) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        };
+
+        
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 true, partitioner);
+
+        // Add segments for these partitions but an exception is received as 
they have not yet been subscribed.
+        // These messages would have been published to the respective metadata 
topic partitions but the ConsumerManager
+        // has not yet been subscribing as they are not yet registered.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        Assertions.assertThrows(Exception.class, () -> 
rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+
+        RemoteLogSegmentMetadata followerSegmentMetadata = new 
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, 
Uuid.randomUuid()),
+            0, 100, -1L, 0,
+            time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        Assertions.assertThrows(Exception.class, () -> 
rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to