showuon commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1284009383
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ########## @@ -64,302 +65,387 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); - private static final long POLL_INTERVAL_MS = 100L; + static long pollIntervalMs = 100L; private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); - private final KafkaConsumer<byte[], byte[]> consumer; - private final String metadataTopicName; + private final Consumer<byte[], byte[]> consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; - private final Time time; + private final Time time = new SystemTime(); + // TODO - Update comments below // It indicates whether the closing process has been started or not. If it is set as true, // consumer will stop consuming messages, and it will not allow partition assignments to be updated. - private volatile boolean closing = false; - + private volatile boolean isClosed = false; // It indicates whether the consumer needs to assign the partitions or not. This is set when it is // determined that the consumer needs to be assigned with the updated partitions. - private volatile boolean assignPartitions = false; + private volatile boolean isAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. - private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. - private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap(); + private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); - // Map of remote log metadata topic partition to consumed offsets. Received consumer records - // may or may not have been processed based on the assigned topic partitions. - private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + private long uninitializedAt = time.milliseconds(); + private boolean isAllUserTopicPartitionsInitialized; - // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. - private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); + private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap<>(); - private final long committedOffsetSyncIntervalMs; - private CommittedOffsetsFile committedOffsetsFile; - private long lastSyncedTimeMs; + private Map<TopicPartition, BeginAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap<>(); + private boolean isOffsetsFetchFailed = false; + private long lastFailedFetchOffsetsTimestamp; - public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, - String metadataTopicName, - RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner topicPartitioner, - Path committedOffsetsPath, - Time time, - long committedOffsetSyncIntervalMs) { - this.consumer = Objects.requireNonNull(consumer); - this.metadataTopicName = Objects.requireNonNull(metadataTopicName); + Function<Optional<String>, Consumer<byte[], byte[]>> consumerSupplier) { + this.consumer = consumerSupplier.apply(Optional.empty()); this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler); this.topicPartitioner = Objects.requireNonNull(topicPartitioner); - this.time = Objects.requireNonNull(time); - this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs; - - initializeConsumerAssignment(committedOffsetsPath); - } - - private void initializeConsumerAssignment(Path committedOffsetsPath) { - try { - committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile()); - } catch (IOException e) { - throw new KafkaException(e); - } - - Map<Integer, Long> committedOffsets = Collections.emptyMap(); - try { - // Load committed offset and assign them in the consumer. - committedOffsets = committedOffsetsFile.readEntries(); - } catch (IOException e) { - // Ignore the error and consumer consumes from the earliest offset. - log.error("Encountered error while building committed offsets from the file. " + - "Consumer will consume from the earliest offset for the assigned partitions.", e); - } - - if (!committedOffsets.isEmpty()) { - // Assign topic partitions from the earlier committed offsets file. - Set<Integer> earlierAssignedPartitions = committedOffsets.keySet(); - assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions); - Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream() - .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x)) - .collect(Collectors.toSet()); - consumer.assign(metadataTopicPartitions); - - // Seek to the committed offsets - for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) { - log.debug("Updating consumed offset: [{}] for partition [{}]", entry.getValue(), entry.getKey()); - partitionToConsumedOffsets.put(entry.getKey(), entry.getValue()); - consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue()); - } - - lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets); - } } @Override public void run() { - log.info("Started Consumer task thread."); - lastSyncedTimeMs = time.milliseconds(); - try { - while (!closing) { - maybeWaitForPartitionsAssignment(); + log.info("Starting consumer task thread."); + while (!isClosed) { + try { + if (isAssignmentChanged) { + maybeWaitForPartitionsAssignment(); + } log.trace("Polling consumer to receive remote log metadata topic records"); - ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); + final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(pollIntervalMs)); for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { processConsumerRecord(record); } - - maybeSyncCommittedDataAndOffsets(false); + maybeMarkUserPartitionsAsReady(); + } catch (final WakeupException ex) { + // ignore logging the error + isClosed = true; + } catch (final RetriableException ex) { + log.warn("Retriable error occurred while processing the records. Retrying...", ex); + } catch (final Exception ex) { Review Comment: nit: additional space after `}` ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), 1000, ""); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(), + 1000, "Wait for the record to be consumed"); + + final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId); + consumerTask.removeAssignmentsForPartitions(removePartitions); + for (final TopicIdPartition idPartition : allPartitions) { + final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); + TestUtils.waitForCondition(condition, 1000, ""); + } + for (TopicIdPartition removePartition : removePartitions) { + TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), 1000, ""); + } + } + + @Test + public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 100); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + + final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false); + Thread assignor = new Thread(() -> { + for (TopicIdPartition partition : allPartitions) { + consumerTask.addAssignmentsForPartitions(Collections.singleton(partition)); + } + isAllPartitionsAssigned.set(true); + }); + Runnable consumerRunnable = () -> { + try { + while (!isAllPartitionsAssigned.get()) { + consumerTask.maybeWaitForPartitionsAssignment(); + } + } catch (Exception e) { Review Comment: About this test, could you explain what you're trying to test? concurrent partition assignment won't impact `maybeWaitForPartitionsAssignment()`? Note: The `assignor` thread might run first (even though you start it later than `consumerRunnable`). So this might happen: 1. assignor completed all partition assignment, and set `isAllPartitionsAssigned` to true 2. `while (!isAllPartitionsAssigned.get()) ` is false, so it won't enter `maybeWaitForPartitionsAssignment` ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), 1000, ""); Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), 1000, ""); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(), + 1000, "Wait for the record to be consumed"); + + final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId); + consumerTask.removeAssignmentsForPartitions(removePartitions); + for (final TopicIdPartition idPartition : allPartitions) { + final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); + TestUtils.waitForCondition(condition, 1000, ""); + } + for (TopicIdPartition removePartition : removePartitions) { + TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), 1000, ""); + } + } + + @Test + public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 100); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + + final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false); + Thread assignor = new Thread(() -> { + for (TopicIdPartition partition : allPartitions) { + consumerTask.addAssignmentsForPartitions(Collections.singleton(partition)); + } + isAllPartitionsAssigned.set(true); + }); + Runnable consumerRunnable = () -> { + try { + while (!isAllPartitionsAssigned.get()) { + consumerTask.maybeWaitForPartitionsAssignment(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }; + + ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); + Future<?> future = consumerExecutor.submit(consumerRunnable); + assignor.start(); + + assignor.join(); + future.get(); + } + + @Test + public void testCanProcessRecord() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2)); + assert partitioner.metadataPartition(tpId0) == partitioner.metadataPartition(tpId1); + assert partitioner.metadataPartition(tpId0) == partitioner.metadataPartition(tpId2); Review Comment: We usually won't do assert this this. Could you directly use `assertTrue` here? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), 1000, ""); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(), + 1000, "Wait for the record to be consumed"); + + final Set<TopicIdPartition> removePartitions = Collections.singleton(tpId); + consumerTask.removeAssignmentsForPartitions(removePartitions); + for (final TopicIdPartition idPartition : allPartitions) { + final TestCondition condition = () -> removePartitions.contains(idPartition) == !consumerTask.isUserPartitionAssigned(idPartition); + TestUtils.waitForCondition(condition, 1000, ""); + } + for (TopicIdPartition removePartition : removePartitions) { + TestUtils.waitForCondition(() -> handler.isPartitionCleared.containsKey(removePartition), 1000, ""); + } + } + + @Test + public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 100); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + + final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false); + Thread assignor = new Thread(() -> { + for (TopicIdPartition partition : allPartitions) { + consumerTask.addAssignmentsForPartitions(Collections.singleton(partition)); + } + isAllPartitionsAssigned.set(true); + }); + Runnable consumerRunnable = () -> { + try { + while (!isAllPartitionsAssigned.get()) { + consumerTask.maybeWaitForPartitionsAssignment(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + }; + + ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); + Future<?> future = consumerExecutor.submit(consumerRunnable); + assignor.start(); + + assignor.join(); + future.get(); + } + + @Test + public void testCanProcessRecord() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new TopicPartition("sample", 2)); + assert partitioner.metadataPartition(tpId0) == partitioner.metadataPartition(tpId1); + assert partitioner.metadataPartition(tpId0) == partitioner.metadataPartition(tpId2); + + final int metadataPartition = partitioner.metadataPartition(tpId0); + consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L)); + final Set<TopicIdPartition> assignments = Collections.singleton(tpId0); + consumerTask.addAssignmentsForPartitions(assignments); + thread.start(); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), 1000, ""); + + addRecord(consumer, metadataPartition, tpId0, 0); + addRecord(consumer, metadataPartition, tpId0, 1); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(metadataPartition).equals(Optional.of(1L)), 1000, "Couldn't read record"); + assertEquals(2, handler.metadataCounter); + + // should only read the tpId1 records + consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1)); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), 1000, ""); + addRecord(consumer, metadataPartition, tpId1, 2); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(metadataPartition).equals(Optional.of(2L)), 1000, "Couldn't read record"); + assertEquals(3, handler.metadataCounter); + + // shouldn't read tpId2 records Review Comment: Could you add more comment here to explain why it can't receive records for tpId2? I think it's because tpId2 is not assigned, right? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); Review Comment: Could you explain what this test is trying to verify? If we want to verify the consumer thread is closed, should we explicitly assert that? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); + assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition))); + assertTrue(handler.isPartitionLoaded.get(idPartition)); + } + } + + @Test + public void testRemoveAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = allPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions)); + thread.start(); + + final TopicIdPartition tpId = allPartitions.get(0); + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId), 1000, ""); + addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0); + TestUtils.waitForCondition(() -> consumerTask.receivedOffsetForPartition(partitioner.metadataPartition(tpId)).isPresent(), + 1000, "Wait for the record to be consumed"); Review Comment: Please use default timeout value here and below. That is, removing the 2nd parameter. ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); Review Comment: Could we give it a timeout to avoid blocking forever? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition; +import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class ConsumerTaskTest { + + private final int numMetadataTopicPartitions = 5; + private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions); + private final DummyEventHandler handler = new DummyEventHandler(); + private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0, numMetadataTopicPartitions).boxed() + .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet()); + private final Uuid topicId = Uuid.randomUuid(); + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + private ConsumerTask consumerTask; + private MockConsumer<byte[], byte[]> consumer; + private Thread thread; + + @BeforeEach + public void beforeEach() { + final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream() + .collect(Collectors.toMap(Function.identity(), e -> 0L)); + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updateBeginningOffsets(offsets); + ConsumerTask.pollIntervalMs = 10L; + consumerTask = new ConsumerTask(handler, partitioner, ignored -> consumer); + thread = new Thread(consumerTask); + } + + @AfterEach + public void afterEach() throws InterruptedException { + if (thread != null) { + consumerTask.close(); + thread.join(); + } + } + + @Test + public void testCloseOnNoAssignment() throws InterruptedException { + thread.start(); + Thread.sleep(10); + } + + @Test + public void testIdempotentClose() { + thread.start(); + consumerTask.close(); + consumerTask.close(); + } + + @Test + public void testUserTopicIdPartitionEquals() { + final TopicIdPartition tpId = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId, partitioner.metadataPartition(tpId)); + utp1.isInitialized = true; + utp1.isAssigned = true; + + assertFalse(utp2.isInitialized); + assertFalse(utp2.isAssigned); + assertEquals(utp1, utp2); + } + + @Test + public void testAddAssignmentsForPartitions() throws InterruptedException { + final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3); + final Map<TopicPartition, Long> endOffsets = idPartitions.stream() + .map(idp -> toRemoteLogPartition(partitioner.metadataPartition(idp))) + .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> b)); + consumer.updateEndOffsets(endOffsets); + consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions)); + thread.start(); + for (final TopicIdPartition idPartition : idPartitions) { + TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), 1000, ""); Review Comment: Why did we only wait for 1 sec, and with empty error message? Maybe we can change to : `TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + idPartition + " to be assigned");` ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + + +import kafka.utils.EmptyTestInfo; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { + private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class); + + private static final int SEG_SIZE = 1024 * 1024; + + private final Time time = new MockTime(1); + private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); + + private TopicBasedRemoteLogMetadataManager rlmm() { + return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + } + + @BeforeEach + public void setup() { + // Start the cluster only. + remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo()); + } + + @AfterEach + public void teardown() throws IOException { + remoteLogMetadataManagerHarness.close(); + } + + @Test + public void testMultiplePartitionSubscriptions() throws Exception { + // Create topics. + String leaderTopic = "leader"; + HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>(); + List<Object> leaderTopicReplicas = new ArrayList<>(); + // Set broker id 0 as the first entry which is taken as the leader. + leaderTopicReplicas.add(0); + leaderTopicReplicas.add(1); + leaderTopicReplicas.add(2); + assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic, + JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String followerTopic = "follower"; + HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>(); + List<Object> followerTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + followerTopicReplicas.add(1); + followerTopicReplicas.add(2); + followerTopicReplicas.add(0); + assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String topicWithNoMessages = "no-messages-topic"; + HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>(); + List<Object> noMessagesTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + noMessagesTopicReplicas.add(1); + noMessagesTopicReplicas.add(2); + noMessagesTopicReplicas.add(0); + assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0)); + + RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) { + @Override + public int metadataPartition(TopicIdPartition topicIdPartition) { + // Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user + // partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition. + // That will make the secondary consumer assignment. + if (emptyTopicIdPartition.equals(topicIdPartition)) { + return 1; + } else { + return 0; + } + } + }; + + remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner); + + // Add segments for these partitions but an exception is received as they have not yet been subscribed. + // These messages would have been published to the respective metadata topic partitions but the ConsumerManager + // has not yet been subscribing as they are not yet registered. + RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + Assertions.assertThrows(Exception.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); Review Comment: could you set a specific thrown exception here, instead of the general `Exception.class`? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + + +import kafka.utils.EmptyTestInfo; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { + private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class); + + private static final int SEG_SIZE = 1024 * 1024; + + private final Time time = new MockTime(1); + private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); + + private TopicBasedRemoteLogMetadataManager rlmm() { + return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + } + + @BeforeEach + public void setup() { + // Start the cluster only. + remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo()); + } + + @AfterEach + public void teardown() throws IOException { + remoteLogMetadataManagerHarness.close(); + } + + @Test + public void testMultiplePartitionSubscriptions() throws Exception { + // Create topics. + String leaderTopic = "leader"; + HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>(); + List<Object> leaderTopicReplicas = new ArrayList<>(); + // Set broker id 0 as the first entry which is taken as the leader. + leaderTopicReplicas.add(0); + leaderTopicReplicas.add(1); + leaderTopicReplicas.add(2); + assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic, + JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String followerTopic = "follower"; + HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>(); + List<Object> followerTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + followerTopicReplicas.add(1); + followerTopicReplicas.add(2); + followerTopicReplicas.add(0); + assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String topicWithNoMessages = "no-messages-topic"; + HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>(); + List<Object> noMessagesTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + noMessagesTopicReplicas.add(1); + noMessagesTopicReplicas.add(2); + noMessagesTopicReplicas.add(0); + assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0)); + + RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) { + @Override + public int metadataPartition(TopicIdPartition topicIdPartition) { + // Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user + // partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition. + // That will make the secondary consumer assignment. + if (emptyTopicIdPartition.equals(topicIdPartition)) { + return 1; + } else { + return 0; + } + } + }; + + remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner); + + // Add segments for these partitions but an exception is received as they have not yet been subscribed. + // These messages would have been published to the respective metadata topic partitions but the ConsumerManager + // has not yet been subscribing as they are not yet registered. + RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + Assertions.assertThrows(Exception.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + + RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + Assertions.assertThrows(Exception.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); + + // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered. + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(leaderTopicIdPartition)); + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition)); + + rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), + Collections.emptySet()); + + // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start + // fetching those events and build the cache. + waitUntilConsumerCatchesUp(30_000L); + // leader partitions would have received as it is registered, but follower partition is not yet registered, + // hence it throws an exception. + Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext()); + Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition)); + + // Register follower partition + rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition), + Collections.singleton(followerTopicIdPartition)); + + // In this state, all the metadata should be available in RLMM for both leader and follower partitions. + TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found"); + TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found"); + } + + private void waitUntilConsumerCatchesUp(long timeoutMs) throws TimeoutException { Review Comment: Why can't we use `TestUtils.waitForCondition` here? ########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + + +import kafka.utils.EmptyTestInfo; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { + private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class); + + private static final int SEG_SIZE = 1024 * 1024; + + private final Time time = new MockTime(1); + private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); + + private TopicBasedRemoteLogMetadataManager rlmm() { + return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); + } + + @BeforeEach + public void setup() { + // Start the cluster only. + remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo()); + } + + @AfterEach + public void teardown() throws IOException { + remoteLogMetadataManagerHarness.close(); + } + + @Test + public void testMultiplePartitionSubscriptions() throws Exception { + // Create topics. + String leaderTopic = "leader"; + HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>(); + List<Object> leaderTopicReplicas = new ArrayList<>(); + // Set broker id 0 as the first entry which is taken as the leader. + leaderTopicReplicas.add(0); + leaderTopicReplicas.add(1); + leaderTopicReplicas.add(2); + assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic, + JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String followerTopic = "follower"; + HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>(); + List<Object> followerTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + followerTopicReplicas.add(1); + followerTopicReplicas.add(2); + followerTopicReplicas.add(0); + assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + String topicWithNoMessages = "no-messages-topic"; + HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>(); + List<Object> noMessagesTopicReplicas = new ArrayList<>(); + // Set broker id 1 as the first entry which is taken as the leader. + noMessagesTopicReplicas.add(1); + noMessagesTopicReplicas.add(2); + noMessagesTopicReplicas.add(0); + assignedTopicReplicas.put(0, JavaConverters.asScalaBuffer(noMessagesTopicReplicas)); + remoteLogMetadataManagerHarness.createTopicWithAssignment( + topicWithNoMessages, JavaConverters.mapAsScalaMap(assignedTopicReplicas), + remoteLogMetadataManagerHarness.listenerName()); + + final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); + final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0)); + + RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) { + @Override + public int metadataPartition(TopicIdPartition topicIdPartition) { + // Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user + // partition(other than noMessagesTopicIdPartition) added to RLMM will use the same metadata partition. + // That will make the secondary consumer assignment. + if (emptyTopicIdPartition.equals(topicIdPartition)) { + return 1; + } else { + return 0; + } + } + }; + + remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner); + + // Add segments for these partitions but an exception is received as they have not yet been subscribed. + // These messages would have been published to the respective metadata topic partitions but the ConsumerManager + // has not yet been subscribing as they are not yet registered. + RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + Assertions.assertThrows(Exception.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get()); + + RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), + 0, 100, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); + Assertions.assertThrows(Exception.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get()); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org