This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff3e6842ff9 KAFKA-15181: Wait for RemoteLogMetadataCache to initialize
after assigning partitions (#14127)
ff3e6842ff9 is described below
commit ff3e6842ff99a600fc02e69ebefb09eef93decb3
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Sat Aug 26 05:52:26 2023 +0530
KAFKA-15181: Wait for RemoteLogMetadataCache to initialize after assigning
partitions (#14127)
This PR adds the following changes to the
`TopicBasedRemoteLogMetadataManager`
1. Added a guard in RemoteLogMetadataCache so that the incoming request can
be served from the cache iff the corresponding user-topic-partition is
initalized
2. Improve error handling in ConsumerTask thread so that is not killed when
there are errors in reading the internal topic
3. ConsumerTask initialization should handle the case when there are no
records to read
and some other minor changes
Added Unit Tests for the changes
Co-authored-by: Kamal Chandraprakash <[email protected]>
Reviewers: Luke Chen <[email protected]>, Jorge Esteban Quilcate Otoya
<[email protected]>, Christo Lolov <[email protected]>, Satish Duggana
<[email protected]>
---
.../remote/metadata/storage/ConsumerManager.java | 32 +-
.../log/remote/metadata/storage/ConsumerTask.java | 546 ++++++++++++---------
.../metadata/storage/RemoteLogMetadataCache.java | 11 +
.../RemotePartitionMetadataEventHandler.java | 5 +
.../storage/RemotePartitionMetadataStore.java | 19 +
.../TopicBasedRemoteLogMetadataManager.java | 20 +-
.../remote/metadata/storage/ConsumerTaskTest.java | 417 ++++++++++++++++
.../TopicBasedRemoteLogMetadataManagerHarness.java | 10 +-
...ogMetadataManagerMultipleSubscriptionsTest.java | 178 +++++++
...icBasedRemoteLogMetadataManagerRestartTest.java | 34 +-
.../TopicBasedRemoteLogMetadataManagerTest.java | 10 +-
11 files changed, 990 insertions(+), 292 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 14ec707a2eb..186cbb17c56 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -27,9 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@@ -41,8 +39,6 @@ import java.util.concurrent.TimeoutException;
*/
public class ConsumerManager implements Closeable {
- public static final String COMMITTED_OFFSETS_FILE_NAME =
"_rlmm_committed_offsets";
-
private static final Logger log =
LoggerFactory.getLogger(ConsumerManager.class);
private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
@@ -60,15 +56,13 @@ public class ConsumerManager implements Closeable {
//Create a task to consume messages and submit the respective events
to RemotePartitionMetadataEventHandler.
KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(rlmmConfig.consumerProperties());
- Path committedOffsetsPath = new File(rlmmConfig.logDir(),
COMMITTED_OFFSETS_FILE_NAME).toPath();
consumerTask = new ConsumerTask(
- consumer,
- rlmmConfig.remoteLogMetadataTopicName(),
- remotePartitionMetadataEventHandler,
- topicPartitioner,
- committedOffsetsPath,
- time,
- 60_000L
+ remotePartitionMetadataEventHandler,
+ topicPartitioner,
+ consumer,
+ 100L,
+ 300_000L,
+ time
);
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask",
consumerTask);
}
@@ -110,7 +104,7 @@ public class ConsumerManager implements Closeable {
log.info("Waiting until consumer is caught up with the target
partition: [{}]", partition);
// If the current assignment does not have the subscription for this
partition then return immediately.
- if (!consumerTask.isPartitionAssigned(partition)) {
+ if (!consumerTask.isMetadataPartitionAssigned(partition)) {
throw new KafkaException("This consumer is not assigned to the
target partition " + partition + ". " +
"Partitions currently assigned: " +
consumerTask.metadataPartitionsAssigned());
}
@@ -119,17 +113,17 @@ public class ConsumerManager implements Closeable {
long startTimeMs = time.milliseconds();
while (true) {
log.debug("Checking if partition [{}] is up to date with offset
[{}]", partition, offset);
- long receivedOffset =
consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
- if (receivedOffset >= offset) {
+ long readOffset =
consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
+ if (readOffset >= offset) {
return;
}
log.debug("Expected offset [{}] for partition [{}], but the
committed offset: [{}], Sleeping for [{}] to retry again",
- offset, partition, receivedOffset, consumeCheckIntervalMs);
+ offset, partition, readOffset, consumeCheckIntervalMs);
if (time.milliseconds() - startTimeMs > timeoutMs) {
log.warn("Expected offset for partition:[{}] is : [{}], but
the committed offset: [{}] ",
- partition, receivedOffset, offset);
+ partition, readOffset, offset);
throw new TimeoutException("Timed out in catching up with the
expected offset by consumer.");
}
@@ -158,7 +152,7 @@ public class ConsumerManager implements Closeable {
consumerTask.removeAssignmentsForPartitions(partitions);
}
- public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
- return consumerTask.receivedOffsetForPartition(metadataPartition);
+ public Optional<Long> readOffsetForPartition(int metadataPartition) {
+ return consumerTask.readOffsetForMetadataPartition(metadataPartition);
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 2c95bf399a5..b53c4ee3374 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -16,12 +16,13 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
@@ -30,8 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@@ -64,302 +63,403 @@ import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
class ConsumerTask implements Runnable, Closeable {
private static final Logger log =
LoggerFactory.getLogger(ConsumerTask.class);
- private static final long POLL_INTERVAL_MS = 100L;
-
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
- private final KafkaConsumer<byte[], byte[]> consumer;
- private final String metadataTopicName;
+ private final Consumer<byte[], byte[]> consumer;
private final RemotePartitionMetadataEventHandler
remotePartitionMetadataEventHandler;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+ // The timeout for the consumer to poll records from the remote log
metadata topic.
+ private final long pollTimeoutMs;
private final Time time;
- // It indicates whether the closing process has been started or not. If it
is set as true,
- // consumer will stop consuming messages, and it will not allow partition
assignments to be updated.
- private volatile boolean closing = false;
-
- // It indicates whether the consumer needs to assign the partitions or
not. This is set when it is
- // determined that the consumer needs to be assigned with the updated
partitions.
- private volatile boolean assignPartitions = false;
+ // It indicates whether the ConsumerTask is closed or not.
+ private volatile boolean isClosed = false;
+ // It indicates whether the user topic partition assignment to the
consumer has changed or not. If the assignment
+ // has changed, the consumer will eventually start tracking the newly
assigned partitions and stop tracking the
+ // ones it is no longer assigned to.
+ // The initial value is set to true to wait for partition assignment on
the first execution; otherwise thread will
+ // be busy without actually doing anything
+ private volatile boolean hasAssignmentChanged = true;
// It represents a lock for any operations related to the
assignedTopicPartitions.
private final Object assignPartitionsLock = new Object();
// Remote log metadata topic partitions that consumer is assigned to.
- private volatile Set<Integer> assignedMetaPartitions =
Collections.emptySet();
+ private volatile Set<Integer> assignedMetadataPartitions =
Collections.emptySet();
// User topic partitions that this broker is a leader/follower for.
- private Set<TopicIdPartition> assignedTopicPartitions =
Collections.emptySet();
+ private volatile Map<TopicIdPartition, UserTopicIdPartition>
assignedUserTopicIdPartitions = Collections.emptyMap();
+ private volatile Set<TopicIdPartition>
processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
- // Map of remote log metadata topic partition to consumed offsets.
Received consumer records
- // may or may not have been processed based on the assigned topic
partitions.
- private final Map<Integer, Long> partitionToConsumedOffsets = new
ConcurrentHashMap<>();
+ private long uninitializedAt;
+ private boolean isAllUserTopicPartitionsInitialized;
- // Map of remote log metadata topic partition to processed offsets that
were synced in committedOffsetsFile.
- private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets =
Collections.emptyMap();
+ // Map of remote log metadata topic partition to consumed offsets.
+ private final Map<Integer, Long> readOffsetsByMetadataPartition = new
ConcurrentHashMap<>();
+ private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition
= new HashMap<>();
- private final long committedOffsetSyncIntervalMs;
- private CommittedOffsetsFile committedOffsetsFile;
- private long lastSyncedTimeMs;
+ private Map<TopicPartition, StartAndEndOffsetHolder>
offsetHolderByMetadataPartition = new HashMap<>();
+ private boolean hasLastOffsetsFetchFailed = false;
+ private long lastFailedFetchOffsetsTimestamp;
+ // The interval between retries to fetch the start and end offsets for the
metadata partitions after a failed fetch.
+ private final long offsetFetchRetryIntervalMs;
- public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
- String metadataTopicName,
- RemotePartitionMetadataEventHandler
remotePartitionMetadataEventHandler,
+ public ConsumerTask(RemotePartitionMetadataEventHandler
remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
- Path committedOffsetsPath,
- Time time,
- long committedOffsetSyncIntervalMs) {
- this.consumer = Objects.requireNonNull(consumer);
- this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
+ Consumer<byte[], byte[]> consumer,
+ long pollTimeoutMs,
+ long offsetFetchRetryIntervalMs,
+ Time time) {
+ this.consumer = consumer;
this.remotePartitionMetadataEventHandler =
Objects.requireNonNull(remotePartitionMetadataEventHandler);
this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+ this.pollTimeoutMs = pollTimeoutMs;
+ this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
this.time = Objects.requireNonNull(time);
- this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
-
- initializeConsumerAssignment(committedOffsetsPath);
- }
-
- private void initializeConsumerAssignment(Path committedOffsetsPath) {
- try {
- committedOffsetsFile = new
CommittedOffsetsFile(committedOffsetsPath.toFile());
- } catch (IOException e) {
- throw new KafkaException(e);
- }
-
- Map<Integer, Long> committedOffsets = Collections.emptyMap();
- try {
- // Load committed offset and assign them in the consumer.
- committedOffsets = committedOffsetsFile.readEntries();
- } catch (IOException e) {
- // Ignore the error and consumer consumes from the earliest offset.
- log.error("Encountered error while building committed offsets from
the file. " +
- "Consumer will consume from the earliest offset
for the assigned partitions.", e);
- }
-
- if (!committedOffsets.isEmpty()) {
- // Assign topic partitions from the earlier committed offsets file.
- Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
- assignedMetaPartitions =
Collections.unmodifiableSet(earlierAssignedPartitions);
- Set<TopicPartition> metadataTopicPartitions =
earlierAssignedPartitions.stream()
-
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
-
.collect(Collectors.toSet());
- consumer.assign(metadataTopicPartitions);
-
- // Seek to the committed offsets
- for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet())
{
- log.debug("Updating consumed offset: [{}] for partition [{}]",
entry.getValue(), entry.getKey());
- partitionToConsumedOffsets.put(entry.getKey(),
entry.getValue());
- consumer.seek(new
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()),
entry.getValue());
- }
-
- lastSyncedPartitionToConsumedOffsets =
Collections.unmodifiableMap(committedOffsets);
- }
+ this.uninitializedAt = time.milliseconds();
}
@Override
public void run() {
- log.info("Started Consumer task thread.");
- lastSyncedTimeMs = time.milliseconds();
- try {
- while (!closing) {
- maybeWaitForPartitionsAssignment();
+ log.info("Starting consumer task thread.");
+ while (!isClosed) {
+ try {
+ if (hasAssignmentChanged) {
+ maybeWaitForPartitionAssignments();
+ }
log.trace("Polling consumer to receive remote log metadata
topic records");
- ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
processConsumerRecord(record);
}
-
- maybeSyncCommittedDataAndOffsets(false);
+ maybeMarkUserPartitionsAsReady();
+ } catch (final WakeupException ex) {
+ // ignore logging the error
+ isClosed = true;
+ } catch (final RetriableException ex) {
+ log.warn("Retriable error occurred while processing the
records. Retrying...", ex);
+ } catch (final Exception ex) {
+ isClosed = true;
+ log.error("Error occurred while processing the records", ex);
}
- } catch (Exception e) {
- log.error("Error occurred in consumer task, close:[{}]", closing,
e);
- } finally {
- maybeSyncCommittedDataAndOffsets(true);
- closeConsumer();
- log.info("Exiting from consumer task thread");
}
+ try {
+ consumer.close(Duration.ofSeconds(30));
+ } catch (final Exception e) {
+ log.error("Error encountered while closing the consumer", e);
+ }
+ log.info("Exited from consumer task thread");
}
private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
- // Taking assignPartitionsLock here as updateAssignmentsForPartitions
changes assignedTopicPartitions
- // and also calls
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for
the removed
- // partitions.
- RemoteLogMetadata remoteLogMetadata =
serde.deserialize(record.value());
- synchronized (assignPartitionsLock) {
- if
(assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
- } else {
- log.debug("This event {} is skipped as the topic partition is
not assigned for this instance.", remoteLogMetadata);
- }
- log.debug("Updating consumed offset: [{}] for partition [{}]",
record.offset(), record.partition());
- partitionToConsumedOffsets.put(record.partition(),
record.offset());
+ final RemoteLogMetadata remoteLogMetadata =
serde.deserialize(record.value());
+ if (shouldProcess(remoteLogMetadata, record.offset())) {
+
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(),
record.offset());
+ } else {
+ log.debug("The event {} is skipped because it is either already
processed or not assigned to this consumer", remoteLogMetadata);
}
+ log.debug("Updating consumed offset: [{}] for partition [{}]",
record.offset(), record.partition());
+ readOffsetsByMetadataPartition.put(record.partition(),
record.offset());
}
- private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
- // Return immediately if there is no consumption from last time.
- boolean noConsumedOffsetUpdates =
partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
- if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() -
lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
- log.debug("Skip syncing committed offsets,
noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates,
forceSync);
+ private boolean shouldProcess(final RemoteLogMetadata metadata, final long
recordOffset) {
+ final TopicIdPartition tpId = metadata.topicIdPartition();
+ final Long readOffset = readOffsetsByUserTopicPartition.get(tpId);
+ return processedAssignmentOfUserTopicIdPartitions.contains(tpId) &&
(readOffset == null || readOffset < recordOffset);
+ }
+
+ private void maybeMarkUserPartitionsAsReady() {
+ if (isAllUserTopicPartitionsInitialized) {
return;
}
-
- try {
- // Need to take lock on assignPartitionsLock as
assignedTopicPartitions might
- // get updated by other threads.
- synchronized (assignPartitionsLock) {
- for (TopicIdPartition topicIdPartition :
assignedTopicPartitions) {
- int metadataPartition =
topicPartitioner.metadataPartition(topicIdPartition);
- Long offset =
partitionToConsumedOffsets.get(metadataPartition);
- if (offset != null) {
-
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition,
metadataPartition, offset);
+ maybeFetchStartAndEndOffsets();
+ boolean isAllInitialized = true;
+ for (final UserTopicIdPartition utp :
assignedUserTopicIdPartitions.values()) {
+ if (utp.isAssigned && !utp.isInitialized) {
+ final Integer metadataPartition = utp.metadataPartition;
+ final StartAndEndOffsetHolder holder =
offsetHolderByMetadataPartition.get(toRemoteLogPartition(metadataPartition));
+ // The offset-holder can be null, when the recent assignment
wasn't picked up by the consumer.
+ if (holder != null) {
+ final Long readOffset =
readOffsetsByMetadataPartition.getOrDefault(metadataPartition, -1L);
+ // 1) The end-offset was fetched only once during
reassignment. The metadata-partition can receive
+ // new stream of records, so the consumer can read records
more than the last-fetched end-offset.
+ // 2) When the internal topic becomes empty due to breach
by size/time/start-offset, then there
+ // are no records to read.
+ if (readOffset + 1 >= holder.endOffset ||
holder.endOffset.equals(holder.startOffset)) {
+ markInitialized(utp);
} else {
- log.debug("Skipping sync-up of the
remote-log-metadata-file for partition: [{}] , with remote log metadata
partition{}, and no offset",
- topicIdPartition, metadataPartition);
+ log.debug("The user-topic-partition {} could not be
marked initialized since the read-offset is {} " +
+ "but the end-offset is {} for the
metadata-partition {}", utp, readOffset, holder.endOffset,
+ metadataPartition);
}
+ } else {
+ log.debug("The offset-holder is null for the
metadata-partition {}. The consumer may not have picked" +
+ " up the recent assignment", metadataPartition);
}
-
- // Write partitionToConsumedOffsets into committed offsets
file as we do not want to process them again
- // in case of restarts.
- committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
- lastSyncedPartitionToConsumedOffsets = new
HashMap<>(partitionToConsumedOffsets);
}
-
- lastSyncedTimeMs = time.milliseconds();
- } catch (IOException e) {
- throw new KafkaException("Error encountered while writing
committed offsets to a local file", e);
+ isAllInitialized = isAllInitialized && utp.isAssigned &&
utp.isInitialized;
}
- }
-
- private void closeConsumer() {
- log.info("Closing the consumer instance");
- try {
- consumer.close(Duration.ofSeconds(30));
- } catch (Exception e) {
- log.error("Error encountered while closing the consumer", e);
+ if (isAllInitialized) {
+ log.info("Initialized for all the {} assigned user-partitions
mapped to the {} meta-partitions in {} ms",
+ assignedUserTopicIdPartitions.size(),
assignedMetadataPartitions.size(),
+ time.milliseconds() - uninitializedAt);
}
+ isAllUserTopicPartitionsInitialized = isAllInitialized;
}
- private void maybeWaitForPartitionsAssignment() {
- Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+ void maybeWaitForPartitionAssignments() throws InterruptedException {
+ // Snapshots of the metadata-partition and user-topic-partition are
used to reduce the scope of the
+ // synchronization block.
+ // 1) LEADER_AND_ISR and STOP_REPLICA requests adds / removes the
user-topic-partitions from the request
+ // handler threads. Those threads should not be blocked for a long
time, therefore scope of the
+ // synchronization block is reduced to bare minimum.
+ // 2) Note that the consumer#position, consumer#seekToBeginning,
consumer#seekToEnd and the other consumer APIs
+ // response times are un-predictable. Those should not be kept in
the synchronization block.
+ final Set<Integer> metadataPartitionSnapshot = new HashSet<>();
+ final Set<UserTopicIdPartition> assignedUserTopicIdPartitionsSnapshot
= new HashSet<>();
synchronized (assignPartitionsLock) {
- // If it is closing, return immediately. This should be inside the
assignPartitionsLock as the closing is updated
- // in close() method with in the same lock to avoid any race
conditions.
- if (closing) {
- return;
+ while (!isClosed && assignedUserTopicIdPartitions.isEmpty()) {
+ log.debug("Waiting for remote log metadata partitions to be
assigned");
+ assignPartitionsLock.wait();
}
-
- while (assignedMetaPartitions.isEmpty()) {
- // If no partitions are assigned, wait until they are assigned.
- log.debug("Waiting for assigned remote log metadata
partitions..");
- try {
- // No timeout is set here, as it is always notified. Even
when it is closed, the race can happen
- // between the thread calling this method and the thread
calling close(). We should have a check
- // for closing as that might have been set and notified
with assignPartitionsLock by `close`
- // method.
- assignPartitionsLock.wait();
-
- if (closing) {
- return;
- }
- } catch (InterruptedException e) {
- throw new KafkaException(e);
- }
- }
-
- if (assignPartitions) {
- assignedMetaPartitionsSnapshot = new
HashSet<>(assignedMetaPartitions);
- // Removing unassigned meta partitions from
partitionToConsumedOffsets and partitionToCommittedOffsets
- partitionToConsumedOffsets.entrySet().removeIf(entry ->
!assignedMetaPartitions.contains(entry.getKey()));
-
- assignPartitions = false;
+ if (!isClosed && hasAssignmentChanged) {
+ assignedUserTopicIdPartitions.values().forEach(utp -> {
+ metadataPartitionSnapshot.add(utp.metadataPartition);
+ assignedUserTopicIdPartitionsSnapshot.add(utp);
+ });
+ hasAssignmentChanged = false;
}
}
-
- if (!assignedMetaPartitionsSnapshot.isEmpty()) {
- executeReassignment(assignedMetaPartitionsSnapshot);
+ if (!metadataPartitionSnapshot.isEmpty()) {
+ final Set<TopicPartition> remoteLogPartitions =
toRemoteLogPartitions(metadataPartitionSnapshot);
+ consumer.assign(remoteLogPartitions);
+ this.assignedMetadataPartitions =
Collections.unmodifiableSet(metadataPartitionSnapshot);
+ // for newly assigned user-partitions, read from the beginning of
the corresponding metadata partition
+ final Set<TopicPartition> seekToBeginOffsetPartitions =
assignedUserTopicIdPartitionsSnapshot
+ .stream()
+ .filter(utp -> !utp.isAssigned)
+ .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+ .collect(Collectors.toSet());
+ consumer.seekToBeginning(seekToBeginOffsetPartitions);
+ // for other metadata partitions, read from the offset where the
processing left last time.
+ remoteLogPartitions.stream()
+ .filter(tp -> !seekToBeginOffsetPartitions.contains(tp) &&
+ readOffsetsByMetadataPartition.containsKey(tp.partition()))
+ .forEach(tp -> consumer.seek(tp,
readOffsetsByMetadataPartition.get(tp.partition())));
+ Set<TopicIdPartition> processedAssignmentPartitions = new
HashSet<>();
+ // mark all the user-topic-partitions as assigned to the consumer.
+ assignedUserTopicIdPartitionsSnapshot.forEach(utp -> {
+ if (!utp.isAssigned) {
+ // Note that there can be a race between `remove` and
`add` partition assignment. Calling the
+ // `maybeLoadPartition` here again to be sure that the
partition gets loaded on the handler.
+
remotePartitionMetadataEventHandler.maybeLoadPartition(utp.topicIdPartition);
+ utp.isAssigned = true;
+ }
+ processedAssignmentPartitions.add(utp.topicIdPartition);
+ });
+ processedAssignmentOfUserTopicIdPartitions = new
HashSet<>(processedAssignmentPartitions);
+
clearResourcesForUnassignedUserTopicPartitions(processedAssignmentPartitions);
+ isAllUserTopicPartitionsInitialized = false;
+ uninitializedAt = time.milliseconds();
+ fetchStartAndEndOffsets();
}
}
- private void executeReassignment(Set<Integer>
assignedMetaPartitionsSnapshot) {
- Set<TopicPartition> assignedMetaTopicPartitions =
- assignedMetaPartitionsSnapshot.stream()
- .map(partitionNum -> new
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
- .collect(Collectors.toSet());
- log.info("Reassigning partitions to consumer task [{}]",
assignedMetaTopicPartitions);
- consumer.assign(assignedMetaTopicPartitions);
+ private void
clearResourcesForUnassignedUserTopicPartitions(Set<TopicIdPartition>
assignedPartitions) {
+ // Note that there can be previously assigned user-topic-partitions
where no records are there to read
+ // (eg) none of the segments for a partition were uploaded. Those
partition resources won't be cleared.
+ // It can be fixed later when required since they are empty resources.
+ Set<TopicIdPartition> unassignedPartitions =
readOffsetsByUserTopicPartition.keySet()
+ .stream()
+ .filter(e -> !assignedPartitions.contains(e))
+ .collect(Collectors.toSet());
+ unassignedPartitions.forEach(unassignedPartition -> {
+
remotePartitionMetadataEventHandler.clearTopicPartition(unassignedPartition);
+ readOffsetsByUserTopicPartition.remove(unassignedPartition);
+ });
+ log.info("Unassigned user-topic-partitions: {}",
unassignedPartitions.size());
+ }
+
+ public void addAssignmentsForPartitions(final Set<TopicIdPartition>
partitions) {
+ updateAssignments(Objects.requireNonNull(partitions),
Collections.emptySet());
+ }
+
+ public void removeAssignmentsForPartitions(final Set<TopicIdPartition>
partitions) {
+ updateAssignments(Collections.emptySet(),
Objects.requireNonNull(partitions));
}
- public void addAssignmentsForPartitions(Set<TopicIdPartition> partitions) {
- updateAssignmentsForPartitions(partitions, Collections.emptySet());
+ private void updateAssignments(final Set<TopicIdPartition> addedPartitions,
+ final Set<TopicIdPartition>
removedPartitions) {
+ log.info("Updating assignments for partitions added: {} and removed:
{}", addedPartitions, removedPartitions);
+ if (!addedPartitions.isEmpty() || !removedPartitions.isEmpty()) {
+ synchronized (assignPartitionsLock) {
+ // Make a copy of the existing assignments and update the copy.
+ final Map<TopicIdPartition, UserTopicIdPartition>
updatedUserPartitions = new HashMap<>(assignedUserTopicIdPartitions);
+ addedPartitions.forEach(tpId ->
updatedUserPartitions.putIfAbsent(tpId, newUserTopicIdPartition(tpId)));
+ removedPartitions.forEach(updatedUserPartitions::remove);
+ if
(!updatedUserPartitions.equals(assignedUserTopicIdPartitions)) {
+ assignedUserTopicIdPartitions =
Collections.unmodifiableMap(updatedUserPartitions);
+ hasAssignmentChanged = true;
+ log.debug("Assigned user-topic-partitions: {}",
assignedUserTopicIdPartitions);
+ assignPartitionsLock.notifyAll();
+ }
+ }
+ }
}
- public void removeAssignmentsForPartitions(Set<TopicIdPartition>
partitions) {
- updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+ public Optional<Long> readOffsetForMetadataPartition(final int partition) {
+ return
Optional.ofNullable(readOffsetsByMetadataPartition.get(partition));
}
- private void updateAssignmentsForPartitions(Set<TopicIdPartition>
addedPartitions,
- Set<TopicIdPartition>
removedPartitions) {
- log.info("Updating assignments for addedPartitions: {} and
removedPartition: {}", addedPartitions, removedPartitions);
+ public boolean isMetadataPartitionAssigned(final int partition) {
+ return assignedMetadataPartitions.contains(partition);
+ }
- Objects.requireNonNull(addedPartitions, "addedPartitions must not be
null");
- Objects.requireNonNull(removedPartitions, "removedPartitions must not
be null");
+ public boolean isUserPartitionAssigned(final TopicIdPartition partition) {
+ final UserTopicIdPartition utp =
assignedUserTopicIdPartitions.get(partition);
+ return utp != null && utp.isAssigned;
+ }
- if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
- return;
+ @Override
+ public void close() {
+ if (!isClosed) {
+ log.info("Closing the instance");
+ synchronized (assignPartitionsLock) {
+ isClosed = true;
+
assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
+ consumer.wakeup();
+ assignPartitionsLock.notifyAll();
+ }
}
+ }
- synchronized (assignPartitionsLock) {
- Set<TopicIdPartition> updatedReassignedPartitions = new
HashSet<>(assignedTopicPartitions);
- updatedReassignedPartitions.addAll(addedPartitions);
- updatedReassignedPartitions.removeAll(removedPartitions);
- Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
- for (TopicIdPartition tp : updatedReassignedPartitions) {
-
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
- }
+ public Set<Integer> metadataPartitionsAssigned() {
+ return Collections.unmodifiableSet(assignedMetadataPartitions);
+ }
+
+ private void fetchStartAndEndOffsets() {
+ try {
+ final Set<TopicPartition> uninitializedPartitions =
assignedUserTopicIdPartitions.values().stream()
+ .filter(utp -> utp.isAssigned && !utp.isInitialized)
+ .map(utp -> toRemoteLogPartition(utp.metadataPartition))
+ .collect(Collectors.toSet());
+ // Removing the previous offset holder if it exists. During
reassignment, if the list-offset
+ // call to `earliest` and `latest` offset fails, then we should
not use the previous values.
+ uninitializedPartitions.forEach(tp ->
offsetHolderByMetadataPartition.remove(tp));
+ if (!uninitializedPartitions.isEmpty()) {
+ Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(uninitializedPartitions);
+ Map<TopicPartition, Long> startOffsets =
consumer.beginningOffsets(uninitializedPartitions);
+ offsetHolderByMetadataPartition = endOffsets.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> new
StartAndEndOffsetHolder(startOffsets.get(e.getKey()), e.getValue())));
- // Clear removed topic partitions from in-memory cache.
- for (TopicIdPartition removedPartition : removedPartitions) {
-
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
}
+ hasLastOffsetsFetchFailed = false;
+ } catch (final RetriableException ex) {
+ // ignore LEADER_NOT_AVAILABLE error, this can happen when the
partition leader is not yet assigned.
+ hasLastOffsetsFetchFailed = true;
+ lastFailedFetchOffsetsTimestamp = time.milliseconds();
+ }
+ }
- assignedTopicPartitions =
Collections.unmodifiableSet(updatedReassignedPartitions);
- log.debug("Assigned topic partitions: {}",
assignedTopicPartitions);
+ private void maybeFetchStartAndEndOffsets() {
+ // If the leader for a `__remote_log_metadata` partition is not
available, then the call to `ListOffsets`
+ // will fail after the default timeout of 1 min. Added a delay between
the retries to prevent the thread from
+ // aggressively fetching the list offsets. During this time, the
recently reassigned user-topic-partitions
+ // won't be marked as initialized.
+ if (hasLastOffsetsFetchFailed && lastFailedFetchOffsetsTimestamp +
offsetFetchRetryIntervalMs < time.milliseconds()) {
+ fetchStartAndEndOffsets();
+ }
+ }
- if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions))
{
- assignedMetaPartitions =
Collections.unmodifiableSet(updatedAssignedMetaPartitions);
- log.debug("Assigned metadata topic partitions: {}",
assignedMetaPartitions);
+ private UserTopicIdPartition newUserTopicIdPartition(final
TopicIdPartition tpId) {
+ return new UserTopicIdPartition(tpId,
topicPartitioner.metadataPartition(tpId));
+ }
- assignPartitions = true;
- assignPartitionsLock.notifyAll();
- } else {
- log.debug("No change in assigned metadata topic partitions:
{}", assignedMetaPartitions);
- }
+ private void markInitialized(final UserTopicIdPartition utp) {
+ // Silently not initialize the utp
+ if (!utp.isAssigned) {
+ log.warn("Tried to initialize a UTP: {} that was not yet
assigned!", utp);
+ return;
+ }
+ if (!utp.isInitialized) {
+
remotePartitionMetadataEventHandler.markInitialized(utp.topicIdPartition);
+ utp.isInitialized = true;
}
}
- public Optional<Long> receivedOffsetForPartition(int partition) {
- return Optional.ofNullable(partitionToConsumedOffsets.get(partition));
+ static Set<TopicPartition> toRemoteLogPartitions(final Set<Integer>
partitions) {
+ return partitions.stream()
+ .map(ConsumerTask::toRemoteLogPartition)
+ .collect(Collectors.toSet());
}
- public boolean isPartitionAssigned(int partition) {
- return assignedMetaPartitions.contains(partition);
+ static TopicPartition toRemoteLogPartition(int partition) {
+ return new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partition);
}
- public void close() {
- if (!closing) {
- synchronized (assignPartitionsLock) {
- // Closing should be updated only after acquiring the lock to
avoid race in
- // maybeWaitForPartitionsAssignment() where it waits on
assignPartitionsLock. It should not wait
- // if the closing is already set.
- closing = true;
- consumer.wakeup();
- assignPartitionsLock.notifyAll();
- }
+ static class UserTopicIdPartition {
+ private final TopicIdPartition topicIdPartition;
+ private final Integer metadataPartition;
+ // The `utp` will be initialized once it reads all the existing events
from the remote log metadata topic.
+ boolean isInitialized;
+ // denotes whether this `utp` is assigned to the consumer
+ boolean isAssigned;
+
+ /**
+ * UserTopicIdPartition denotes the user topic-partitions for which
this broker acts as a leader/follower of.
+ *
+ * @param tpId the unique topic partition identifier
+ * @param metadataPartition the remote log metadata partition mapped
for this user-topic-partition.
+ */
+ public UserTopicIdPartition(final TopicIdPartition tpId, final Integer
metadataPartition) {
+ this.topicIdPartition = Objects.requireNonNull(tpId);
+ this.metadataPartition = Objects.requireNonNull(metadataPartition);
+ this.isInitialized = false;
+ this.isAssigned = false;
+ }
+
+ @Override
+ public String toString() {
+ return "UserTopicIdPartition{" +
+ "topicIdPartition=" + topicIdPartition +
+ ", metadataPartition=" + metadataPartition +
+ ", isInitialized=" + isInitialized +
+ ", isAssigned=" + isAssigned +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ UserTopicIdPartition that = (UserTopicIdPartition) o;
+ return topicIdPartition.equals(that.topicIdPartition) &&
metadataPartition.equals(that.metadataPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicIdPartition, metadataPartition);
}
}
- public Set<Integer> metadataPartitionsAssigned() {
- return Collections.unmodifiableSet(assignedMetaPartitions);
+ static class StartAndEndOffsetHolder {
+ Long startOffset;
+ Long endOffset;
+
+ public StartAndEndOffsetHolder(Long startOffset, Long endOffset) {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "StartAndEndOffsetHolder{" +
+ "startOffset=" + startOffset +
+ ", endOffset=" + endOffset +
+ '}';
+ }
}
-}
+}
\ No newline at end of file
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 8c89df3df2c..758a024e25c 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
/**
* This class provides an in-memory cache of remote log segment metadata. This
maintains the lineage of segments
@@ -104,6 +105,16 @@ public class RemoteLogMetadataCache {
// https://issues.apache.org/jira/browse/KAFKA-12641
protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState>
leaderEpochEntries = new ConcurrentHashMap<>();
+ private final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+ public void markInitialized() {
+ initializedLatch.countDown();
+ }
+
+ public boolean isInitialized() {
+ return initializedLatch.getCount() == 0;
+ }
+
/**
* Returns {@link RemoteLogSegmentMetadata} if it exists for the given
leader-epoch containing the offset and with
* {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns
{@link Optional#empty()}.
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
index c92a51ecaca..f4f43b0d883 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
@@ -50,4 +50,9 @@ public abstract class RemotePartitionMetadataEventHandler {
public abstract void clearTopicPartition(TopicIdPartition
topicIdPartition);
+ public abstract void markInitialized(TopicIdPartition partition);
+
+ public abstract boolean isInitialized(TopicIdPartition partition);
+
+ public abstract void maybeLoadPartition(TopicIdPartition partition);
}
\ No newline at end of file
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index 7051d184aad..f9394eee99f 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
@@ -151,6 +152,12 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
throw new RemoteResourceNotFoundException("No resource found for
partition: " + topicIdPartition);
}
+ if (!remoteLogMetadataCache.isInitialized()) {
+ // Throwing a retriable ReplicaNotAvailableException here for
clients retry. We can introduce a new more
+ // appropriate exception with a KIP in the future.
+ throw new ReplicaNotAvailableException("Remote log metadata cache
is not initialized for partition: " + topicIdPartition);
+ }
+
return remoteLogMetadataCache;
}
@@ -180,9 +187,21 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
idToRemoteLogMetadataCache = Collections.emptyMap();
}
+ @Override
public void maybeLoadPartition(TopicIdPartition partition) {
idToRemoteLogMetadataCache.computeIfAbsent(partition,
topicIdPartition -> new
FileBasedRemoteLogMetadataCache(topicIdPartition,
partitionLogDirectory(topicIdPartition.topicPartition())));
}
+ @Override
+ public void markInitialized(TopicIdPartition partition) {
+ idToRemoteLogMetadataCache.get(partition).markInitialized();
+ log.trace("Remote log components are initialized for user-partition:
{}", partition);
+ }
+
+ @Override
+ public boolean isInitialized(TopicIdPartition topicIdPartition) {
+ RemoteLogMetadataCache metadataCache =
idToRemoteLogMetadataCache.get(topicIdPartition);
+ return metadataCache != null && metadataCache.isInitialized();
+ }
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 4b02b9b6763..e1bf145bbd8 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -84,7 +84,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
private RemotePartitionMetadataStore remotePartitionMetadataStore;
private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
- private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+ private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions =
Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
@@ -260,12 +260,12 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
}
public int metadataPartition(TopicIdPartition topicIdPartition) {
- return rlmmTopicPartitioner.metadataPartition(topicIdPartition);
+ return rlmTopicPartitioner.metadataPartition(topicIdPartition);
}
// Visible For Testing
- public Optional<Long> receivedOffsetForPartition(int metadataPartition) {
- return consumerManager.receivedOffsetForPartition(metadataPartition);
+ public Optional<Long> readOffsetForPartition(int metadataPartition) {
+ return consumerManager.readOffsetForPartition(metadataPartition);
}
@Override
@@ -357,7 +357,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
log.info("Started configuring topic-based RLMM with configs: {}",
configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
- rlmmTopicPartitioner = new
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+ rlmTopicPartitioner = new
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
configured = true;
log.info("Successfully configured topic-based RLMM with config:
{}", rlmmConfig);
@@ -416,8 +416,8 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
// Create producer and consumer managers.
lock.writeLock().lock();
try {
- producerManager = new ProducerManager(rlmmConfig,
rlmmTopicPartitioner);
- consumerManager = new ConsumerManager(rlmmConfig,
remotePartitionMetadataStore, rlmmTopicPartitioner, time);
+ producerManager = new ProducerManager(rlmmConfig,
rlmTopicPartitioner);
+ consumerManager = new ConsumerManager(rlmmConfig,
remotePartitionMetadataStore, rlmTopicPartitioner, time);
if (startConsumerThread) {
consumerManager.startConsumerThread();
} else {
@@ -509,10 +509,8 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
// Visible for testing.
- public void startConsumerThread() {
- if (consumerManager != null) {
- consumerManager.startConsumerThread();
- }
+ void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner
rlmTopicPartitioner) {
+ this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner);
}
@Override
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
new file mode 100644
index 00000000000..2b36c4bb039
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.SystemTime;
+import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.UserTopicIdPartition;
+import static
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.toRemoteLogPartition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ConsumerTaskTest {
+
+ private final int numMetadataTopicPartitions = 5;
+ private final RemoteLogMetadataTopicPartitioner partitioner = new
RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions);
+ private final DummyEventHandler handler = new DummyEventHandler();
+ private final Set<TopicPartition> remoteLogPartitions = IntStream.range(0,
numMetadataTopicPartitions).boxed()
+ .map(ConsumerTask::toRemoteLogPartition).collect(Collectors.toSet());
+ private final Uuid topicId = Uuid.randomUuid();
+ private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+ private ConsumerTask consumerTask;
+ private MockConsumer<byte[], byte[]> consumer;
+ private Thread thread;
+
+ @BeforeEach
+ public void beforeEach() {
+ final Map<TopicPartition, Long> offsets = remoteLogPartitions.stream()
+ .collect(Collectors.toMap(Function.identity(), e -> 0L));
+ consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ consumer.updateBeginningOffsets(offsets);
+ consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L,
300_000L, new SystemTime());
+ thread = new Thread(consumerTask);
+ }
+
+ @AfterEach
+ public void afterEach() throws InterruptedException {
+ if (thread != null) {
+ assertDoesNotThrow(() -> consumerTask.close(), "Close method threw
exception");
+ thread.join(10_000);
+ assertFalse(thread.isAlive(), "Consumer task thread is still
alive");
+ }
+ }
+
+ /**
+ * Tests that the consumer task shuts down gracefully when there were no
assignments.
+ */
+ @Test
+ public void testCloseOnNoAssignment() throws InterruptedException {
+ thread.start();
+ Thread.sleep(10);
+ assertDoesNotThrow(() -> consumerTask.close(), "Close method threw
exception");
+ }
+
+ @Test
+ public void testIdempotentClose() {
+ thread.start();
+ consumerTask.close();
+ consumerTask.close();
+ }
+
+ @Test
+ public void testUserTopicIdPartitionEquals() {
+ final TopicIdPartition tpId = new TopicIdPartition(topicId, new
TopicPartition("sample", 0));
+ final UserTopicIdPartition utp1 = new UserTopicIdPartition(tpId,
partitioner.metadataPartition(tpId));
+ final UserTopicIdPartition utp2 = new UserTopicIdPartition(tpId,
partitioner.metadataPartition(tpId));
+ utp1.isInitialized = true;
+ utp1.isAssigned = true;
+
+ assertFalse(utp2.isInitialized);
+ assertFalse(utp2.isAssigned);
+ assertEquals(utp1, utp2);
+ }
+
+ @Test
+ public void testAddAssignmentsForPartitions() throws InterruptedException {
+ final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
+ final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
+ .map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+ .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
+ consumer.updateEndOffsets(endOffsets);
+ consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
+ thread.start();
+ for (final TopicIdPartition idPartition : idPartitions) {
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
+
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
+ assertTrue(handler.isPartitionLoaded.get(idPartition));
+ }
+ }
+
+ @Test
+ public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
+ final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+ .map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+ .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
+ consumer.updateEndOffsets(endOffsets);
+ consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
+ thread.start();
+
+ final TopicIdPartition tpId = allPartitions.get(0);
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
+ addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
+ "Couldn't read record");
+
+ final Set<TopicIdPartition> removePartitions =
Collections.singleton(tpId);
+ consumerTask.removeAssignmentsForPartitions(removePartitions);
+ for (final TopicIdPartition idPartition : allPartitions) {
+ final TestCondition condition = () ->
removePartitions.contains(idPartition) ==
!consumerTask.isUserPartitionAssigned(idPartition);
+ TestUtils.waitForCondition(condition, "Timed out waiting for " +
idPartition + " to be removed");
+ }
+ for (TopicIdPartition removePartition : removePartitions) {
+ TestUtils.waitForCondition(() ->
handler.isPartitionCleared.containsKey(removePartition),
+ "Timed out waiting for " + removePartition + " to be cleared");
+ }
+ }
+
+ @Test
+ public void testConcurrentPartitionAssignments() throws
InterruptedException, ExecutionException {
+ final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
100);
+ final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
+ .map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
+ .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
+ consumer.updateEndOffsets(endOffsets);
+
+ final AtomicBoolean isAllPartitionsAssigned = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread assignor = new Thread(() -> {
+ int partitionsAssigned = 0;
+ for (TopicIdPartition partition : allPartitions) {
+ if (partitionsAssigned == 50) {
+ // Once half the topic partitions are assigned, wait for
the consumer to catch up. This ensures
+ // that the consumer is already running when the rest of
the partitions are assigned.
+ try {
+ latch.await(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+
consumerTask.addAssignmentsForPartitions(Collections.singleton(partition));
+ partitionsAssigned++;
+ }
+ isAllPartitionsAssigned.set(true);
+ });
+ Runnable consumerRunnable = () -> {
+ try {
+ while (!isAllPartitionsAssigned.get()) {
+ consumerTask.maybeWaitForPartitionAssignments();
+ latch.countDown();
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ };
+
+ ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
+ Future<?> future = consumerExecutor.submit(consumerRunnable);
+ assignor.start();
+
+ assignor.join();
+ future.get();
+ }
+
+ @Test
+ public void testCanProcessRecord() throws InterruptedException {
+ final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+ final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new
TopicPartition("sample", 0));
+ final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new
TopicPartition("sample", 1));
+ final TopicIdPartition tpId2 = new TopicIdPartition(topicId, new
TopicPartition("sample", 2));
+ assertEquals(partitioner.metadataPartition(tpId0),
partitioner.metadataPartition(tpId1));
+ assertEquals(partitioner.metadataPartition(tpId0),
partitioner.metadataPartition(tpId2));
+
+ final int metadataPartition = partitioner.metadataPartition(tpId0);
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
0L));
+ final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
+ consumerTask.addAssignmentsForPartitions(assignments);
+ thread.start();
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 +
" to be assigned");
+
+ addRecord(consumer, metadataPartition, tpId0, 0);
+ addRecord(consumer, metadataPartition, tpId0, 1);
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ assertEquals(2, handler.metadataCounter);
+
+ // should only read the tpId1 records
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 +
" to be assigned");
+ addRecord(consumer, metadataPartition, tpId1, 2);
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)),
"Couldn't read record");
+ assertEquals(3, handler.metadataCounter);
+
+ // shouldn't read tpId2 records because it's not assigned
+ addRecord(consumer, metadataPartition, tpId2, 3);
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)),
"Couldn't read record");
+ assertEquals(3, handler.metadataCounter);
+ }
+
+ @Test
+ public void testMaybeMarkUserPartitionsAsReady() throws
InterruptedException {
+ final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
+ final int metadataPartition = partitioner.metadataPartition(tpId);
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
2L));
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ thread.start();
+
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+ assertFalse(handler.isPartitionInitialized.containsKey(tpId));
+ IntStream.range(0, 5).forEach(offset -> addRecord(consumer,
metadataPartition, tpId, offset));
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L)),
"Couldn't read record");
+ assertTrue(handler.isPartitionInitialized.get(tpId));
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {"0, 0", "500, 500"})
+ public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long
beginOffset,
+ long
endOffset) throws InterruptedException {
+ final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+ final int metadataPartition = partitioner.metadataPartition(tpId);
+
consumer.updateBeginningOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
beginOffset));
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
endOffset));
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ thread.start();
+
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+ TestUtils.waitForCondition(() ->
handler.isPartitionInitialized.containsKey(tpId),
+ "should have initialized the partition");
+
assertFalse(consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
+ }
+
+ @Test
+ public void testConcurrentAccess() throws InterruptedException {
+ thread.start();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final TopicIdPartition tpId = getIdPartitions("concurrent", 1).get(0);
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(partitioner.metadataPartition(tpId)),
0L));
+ final Thread assignmentThread = new Thread(() -> {
+ try {
+ latch.await();
+
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ } catch (final InterruptedException e) {
+ fail("Shouldn't have thrown an exception");
+ }
+ });
+ final Thread closeThread = new Thread(() -> {
+ try {
+ latch.await();
+ consumerTask.close();
+ } catch (final InterruptedException e) {
+ fail("Shouldn't have thrown an exception");
+ }
+ });
+ assignmentThread.start();
+ closeThread.start();
+
+ latch.countDown();
+ assignmentThread.join();
+ closeThread.join();
+ }
+
+ @Test
+ public void testConsumerShouldNotCloseOnRetriableError() throws
InterruptedException {
+ final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+ final int metadataPartition = partitioner.metadataPartition(tpId);
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
1L));
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ thread.start();
+
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+
+ consumer.setPollException(new LeaderNotAvailableException("leader not
available!"));
+ addRecord(consumer, metadataPartition, tpId, 0);
+ consumer.setPollException(new TimeoutException("Not able to complete
the operation within the timeout"));
+ addRecord(consumer, metadataPartition, tpId, 1);
+
+ TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ assertEquals(2, handler.metadataCounter);
+ }
+
+ @Test
+ public void testConsumerShouldCloseOnNonRetriableError() throws
InterruptedException {
+ final TopicIdPartition tpId = getIdPartitions("world", 1).get(0);
+ final int metadataPartition = partitioner.metadataPartition(tpId);
+
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
1L));
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId));
+ thread.start();
+
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Waiting for " + tpId + " to be
assigned");
+
assertTrue(consumerTask.isMetadataPartitionAssigned(metadataPartition));
+
+ consumer.setPollException(new AuthorizationException("Unauthorized to
read the topic!"));
+ TestUtils.waitForCondition(() -> consumer.closed(), "Should close the
consume on non-retriable error");
+ }
+
+ private void addRecord(final MockConsumer<byte[], byte[]> consumer,
+ final int metadataPartition,
+ final TopicIdPartition idPartition,
+ final long recordOffset) {
+ final RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(idPartition, Uuid.randomUuid());
+ final RemoteLogMetadata metadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 1L, 0L, 0, 0L, 1,
Collections.singletonMap(0, 0L));
+ final ConsumerRecord<byte[], byte[]> record = new
ConsumerRecord<>(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
metadataPartition, recordOffset, null, serde.serialize(metadata));
+ consumer.addRecord(record);
+ }
+
+ private List<TopicIdPartition> getIdPartitions(final String topic, final
int partitionCount) {
+ final List<TopicIdPartition> idPartitions = new ArrayList<>();
+ for (int partition = 0; partition < partitionCount; partition++) {
+ idPartitions.add(new TopicIdPartition(topicId, new
TopicPartition(topic, partition)));
+ }
+ return idPartitions;
+ }
+
+ private static class DummyEventHandler extends
RemotePartitionMetadataEventHandler {
+ private int metadataCounter = 0;
+ private final Map<TopicIdPartition, Boolean> isPartitionInitialized =
new HashMap<>();
+ private final Map<TopicIdPartition, Boolean> isPartitionLoaded = new
HashMap<>();
+ private final Map<TopicIdPartition, Boolean> isPartitionCleared = new
HashMap<>();
+
+ @Override
+ protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
+ metadataCounter++;
+ }
+
+ @Override
+ protected void
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate
remoteLogSegmentMetadataUpdate) {
+ }
+
+ @Override
+ protected void
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata
remotePartitionDeleteMetadata) {
+ }
+
+ @Override
+ public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
int metadataPartition, Long metadataPartitionOffset) {
+ }
+
+ @Override
+ public void clearTopicPartition(TopicIdPartition topicIdPartition) {
+ isPartitionCleared.put(topicIdPartition, true);
+ }
+
+ @Override
+ public void markInitialized(TopicIdPartition partition) {
+ isPartitionInitialized.put(partition, true);
+ }
+
+ @Override
+ public boolean isInitialized(TopicIdPartition partition) {
+ return true;
+ }
+
+ @Override
+ public void maybeLoadPartition(TopicIdPartition partition) {
+ isPartitionLoaded.put(partition, true);
+ }
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index e39d872744a..abad6ea7676 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -63,11 +63,12 @@ public class TopicBasedRemoteLogMetadataManagerHarness
extends IntegrationTestHa
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());
- initializeRemoteLogMetadataManager(topicIdPartitions,
startConsumerThread);
+ initializeRemoteLogMetadataManager(topicIdPartitions,
startConsumerThread, null);
}
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition>
topicIdPartitions,
- boolean
startConsumerThread) {
+ boolean startConsumerThread,
+
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
String logDir =
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new
TopicBasedRemoteLogMetadataManager(startConsumerThread) {
@Override
@@ -104,6 +105,9 @@ public class TopicBasedRemoteLogMetadataManagerHarness
extends IntegrationTestHa
log.debug("TopicBasedRemoteLogMetadataManager configs after adding
overridden properties: {}", configs);
topicBasedRemoteLogMetadataManager.configure(configs);
+ if (remoteLogMetadataTopicPartitioner != null) {
+
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
+ }
try {
waitUntilInitialized(60_000);
} catch (TimeoutException e) {
@@ -145,4 +149,4 @@ public class TopicBasedRemoteLogMetadataManagerHarness
extends IntegrationTestHa
public void closeRemoteLogMetadataManager() {
Utils.closeQuietly(topicBasedRemoteLogMetadataManager,
"TopicBasedRemoteLogMetadataManager");
}
-}
\ No newline at end of file
+}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
new file mode 100644
index 00000000000..3386b94f895
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+
+import kafka.utils.EmptyTestInfo;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
+ private static final Logger log =
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.class);
+
+ private static final int SEG_SIZE = 1024 * 1024;
+
+ private final Time time = new MockTime(1);
+ private final TopicBasedRemoteLogMetadataManagerHarness
remoteLogMetadataManagerHarness = new
TopicBasedRemoteLogMetadataManagerHarness();
+
+ private TopicBasedRemoteLogMetadataManager rlmm() {
+ return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+ }
+
+ @BeforeEach
+ public void setup() {
+ // Start the cluster only.
+ remoteLogMetadataManagerHarness.setUp(new EmptyTestInfo());
+ }
+
+ @AfterEach
+ public void teardown() throws IOException {
+ remoteLogMetadataManagerHarness.close();
+ }
+
+ @Test
+ public void testMultiplePartitionSubscriptions() throws Exception {
+ // Create topics.
+ String leaderTopic = "leader";
+ HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new
HashMap<>();
+ List<Object> leaderTopicReplicas = new ArrayList<>();
+ // Set broker id 0 as the first entry which is taken as the leader.
+ leaderTopicReplicas.add(0);
+ leaderTopicReplicas.add(1);
+ leaderTopicReplicas.add(2);
+ assignedLeaderTopicReplicas.put(0,
JavaConverters.asScalaBuffer(leaderTopicReplicas));
+ remoteLogMetadataManagerHarness.createTopicWithAssignment(leaderTopic,
+ JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
+ remoteLogMetadataManagerHarness.listenerName());
+
+ String followerTopic = "follower";
+ HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new
HashMap<>();
+ List<Object> followerTopicReplicas = new ArrayList<>();
+ // Set broker id 1 as the first entry which is taken as the leader.
+ followerTopicReplicas.add(1);
+ followerTopicReplicas.add(2);
+ followerTopicReplicas.add(0);
+ assignedFollowerTopicReplicas.put(0,
JavaConverters.asScalaBuffer(followerTopicReplicas));
+ remoteLogMetadataManagerHarness.createTopicWithAssignment(
+ followerTopic,
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
+ remoteLogMetadataManagerHarness.listenerName());
+
+ String topicWithNoMessages = "no-messages-topic";
+ HashMap<Object, Seq<Object>> assignedTopicReplicas = new HashMap<>();
+ List<Object> noMessagesTopicReplicas = new ArrayList<>();
+ // Set broker id 1 as the first entry which is taken as the leader.
+ noMessagesTopicReplicas.add(1);
+ noMessagesTopicReplicas.add(2);
+ noMessagesTopicReplicas.add(0);
+ assignedTopicReplicas.put(0,
JavaConverters.asScalaBuffer(noMessagesTopicReplicas));
+ remoteLogMetadataManagerHarness.createTopicWithAssignment(
+ topicWithNoMessages,
JavaConverters.mapAsScalaMap(assignedTopicReplicas),
+ remoteLogMetadataManagerHarness.listenerName());
+
+ final TopicIdPartition leaderTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+ final TopicIdPartition followerTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+ final TopicIdPartition emptyTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));
+
+ RemoteLogMetadataTopicPartitioner partitioner = new
RemoteLogMetadataTopicPartitioner(10) {
+ @Override
+ public int metadataPartition(TopicIdPartition topicIdPartition) {
+ // Always return partition 0 except for
noMessagesTopicIdPartition. So that, any new user
+ // partition(other than noMessagesTopicIdPartition) added to
RLMM will use the same metadata partition.
+ // That will make the secondary consumer assignment.
+ if (emptyTopicIdPartition.equals(topicIdPartition)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ };
+
+
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
true, partitioner);
+
+ // Add segments for these partitions but an exception is received as
they have not yet been subscribed.
+ // These messages would have been published to the respective metadata
topic partitions but the ConsumerManager
+ // has not yet been subscribing as they are not yet registered.
+ RemoteLogSegmentMetadata leaderSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
+ 0, 100, -1L, 0,
+ time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+ ExecutionException exception =
Assertions.assertThrows(ExecutionException.class, () ->
rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
+ Assertions.assertEquals("org.apache.kafka.common.KafkaException: This
consumer is not assigned to the target partition 0. Partitions currently
assigned: []",
+ exception.getMessage());
+
+ RemoteLogSegmentMetadata followerSegmentMetadata = new
RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition,
Uuid.randomUuid()),
+ 0, 100, -1L, 0,
+ time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+ exception = Assertions.assertThrows(ExecutionException.class, () ->
rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
+ Assertions.assertEquals("org.apache.kafka.common.KafkaException: This
consumer is not assigned to the target partition 0. Partitions currently
assigned: []",
+ exception.getMessage());
+
+ // `listRemoteLogSegments` will receive an exception as these topic
partitions are not yet registered.
+ Assertions.assertThrows(RemoteStorageException.class, () ->
rlmm().listRemoteLogSegments(leaderTopicIdPartition));
+ Assertions.assertThrows(RemoteStorageException.class, () ->
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+
rlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
+ Collections.emptySet());
+
+ // RemoteLogSegmentMetadata events are already published, and
topicBasedRlmm's consumer manager will start
+ // fetching those events and build the cache.
+ waitUntilConsumerCatchesUp(30_000L);
+ // leader partitions would have received as it is registered, but
follower partition is not yet registered,
+ // hence it throws an exception.
+
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
+ Assertions.assertThrows(RemoteStorageException.class, () ->
rlmm().listRemoteLogSegments(followerTopicIdPartition));
+
+ // Register follower partition
+
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
+ Collections.singleton(followerTopicIdPartition));
+
+ // In this state, all the metadata should be available in RLMM for
both leader and follower partitions.
+ TestUtils.waitForCondition(() ->
rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments
found");
+ TestUtils.waitForCondition(() ->
rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments
found");
+ }
+
+ private void waitUntilConsumerCatchesUp(long timeoutMs) throws
TimeoutException, InterruptedException {
+ TestUtils.waitForCondition(() -> {
+ // If both the leader and follower partitions are mapped to the
same metadata partition which is 0, it
+ // should have at least 2 messages. That means, read offset should
be >= 1 (including duplicate messages if any).
+ return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1;
+ }, timeoutMs, "Consumer did not catch up");
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 714405a30e9..46ffeba2142 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -31,18 +31,14 @@ import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.Seq;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import static
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
@@ -69,7 +65,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
}
private void startTopicBasedRemoteLogMetadataManagerHarness(boolean
startConsumerThread) {
-
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
startConsumerThread);
+
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
startConsumerThread, null);
}
@AfterEach
@@ -136,9 +132,8 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
// Stop TopicBasedRemoteLogMetadataManager only.
stopTopicBasedRemoteLogMetadataManagerHarness();
- // Start TopicBasedRemoteLogMetadataManager but do not start consumer
thread to check whether the stored metadata is
- // loaded successfully or not.
- startTopicBasedRemoteLogMetadataManagerHarness(false);
+ // Start TopicBasedRemoteLogMetadataManager
+ startTopicBasedRemoteLogMetadataManagerHarness(true);
// Register these partitions to RLMM, which loads the respective
metadata snapshots.
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.singleton(followerTopicIdPartition));
@@ -148,29 +143,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest
{
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)));
- // Check whether the check-pointed consumer offsets are stored or not.
- Path committedOffsetsPath = new File(logDir,
COMMITTED_OFFSETS_FILE_NAME).toPath();
- Assertions.assertTrue(committedOffsetsPath.toFile().exists());
- CommittedOffsetsFile committedOffsetsFile = new
CommittedOffsetsFile(committedOffsetsPath.toFile());
-
- int metadataPartition1 =
topicBasedRlmm().metadataPartition(leaderTopicIdPartition);
- int metadataPartition2 =
topicBasedRlmm().metadataPartition(followerTopicIdPartition);
- Optional<Long> receivedOffsetForPartition1 =
topicBasedRlmm().receivedOffsetForPartition(metadataPartition1);
- Optional<Long> receivedOffsetForPartition2 =
topicBasedRlmm().receivedOffsetForPartition(metadataPartition2);
- Assertions.assertTrue(receivedOffsetForPartition1.isPresent());
- Assertions.assertTrue(receivedOffsetForPartition2.isPresent());
-
- // Make sure these offsets are at least 0.
- Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0);
- Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0);
-
- // Check the stored entries and the offsets that were set on consumer
are the same.
- Map<Integer, Long> partitionToOffset =
committedOffsetsFile.readEntries();
- Assertions.assertEquals(partitionToOffset.get(metadataPartition1),
receivedOffsetForPartition1.get());
- Assertions.assertEquals(partitionToOffset.get(metadataPartition2),
receivedOffsetForPartition2.get());
-
- // Start Consumer thread
- topicBasedRlmm().startConsumerThread();
// Add one more segment
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new
RemoteLogSegmentMetadata(
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index eaf62edfea7..96e48de8a73 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -149,17 +149,17 @@ public class TopicBasedRemoteLogMetadataManagerTest {
}
// If both the leader and follower partitions are mapped to the
same metadata partition then it should have at least
- // 2 messages. That means, received offset should be >= 1
(including duplicate messages if any).
+ // 2 messages. That means, read offset should be >= 1 (including
duplicate messages if any).
if (leaderMetadataPartition == followerMetadataPartition) {
- if
(topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L)
>= 1) {
+ if
(topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L)
>= 1) {
break;
}
} else {
// If the leader partition and the follower partition are
mapped to different metadata partitions then
- // each of those metadata partitions will have at least 1
message. That means, received offset should
+ // each of those metadata partitions will have at least 1
message. That means, read offset should
// be >= 0 (including duplicate messages if any).
- if
(topicBasedRlmm().receivedOffsetForPartition(leaderMetadataPartition).orElse(-1L)
>= 0 ||
-
topicBasedRlmm().receivedOffsetForPartition(followerMetadataPartition).orElse(-1L)
>= 0) {
+ if
(topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L)
>= 0 ||
+
topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L)
>= 0) {
break;
}
}