This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cc4e699d4cb MINOR: Minor logging and doc related improvements in
topic-based RLMM consumer-manager/task (#14045)
cc4e699d4cb is described below
commit cc4e699d4cb2880d05603e2e8310d28e1c9f201a
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Sat Jul 22 07:33:35 2023 +0300
MINOR: Minor logging and doc related improvements in topic-based RLMM
consumer-manager/task (#14045)
Improved logging and docs on consumer manager/task call paths.
Reviewers: Luke Chen <[email protected]>, Satish Duggana
<[email protected]>
---
.../remote/metadata/storage/ConsumerManager.java | 33 ++++++++++++++------
.../log/remote/metadata/storage/ConsumerTask.java | 15 +++++++--
.../TopicBasedRemoteLogMetadataManager.java | 36 +++++++---------------
.../TopicBasedRemoteLogMetadataManagerConfig.java | 8 +++++
.../storage/RemoteLogSegmentLifecycleTest.java | 8 +++--
...picBasedRemoteLogMetadataManagerConfigTest.java | 13 +++++---
...icBasedRemoteLogMetadataManagerRestartTest.java | 2 +-
.../TopicBasedRemoteLogMetadataManagerTest.java | 8 ++---
8 files changed, 73 insertions(+), 50 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 77f83fb90b7..14ec707a2eb 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -61,7 +61,15 @@ public class ConsumerManager implements Closeable {
//Create a task to consume messages and submit the respective events
to RemotePartitionMetadataEventHandler.
KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(rlmmConfig.consumerProperties());
Path committedOffsetsPath = new File(rlmmConfig.logDir(),
COMMITTED_OFFSETS_FILE_NAME).toPath();
- consumerTask = new ConsumerTask(consumer,
remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath,
time, 60_000L);
+ consumerTask = new ConsumerTask(
+ consumer,
+ rlmmConfig.remoteLogMetadataTopicName(),
+ remotePartitionMetadataEventHandler,
+ topicPartitioner,
+ committedOffsetsPath,
+ time,
+ 60_000L
+ );
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask",
consumerTask);
}
@@ -76,7 +84,8 @@ public class ConsumerManager implements Closeable {
}
/**
- * Waits if necessary for the consumption to reach the offset of the given
{@code recordMetadata}.
+ * Waits if necessary for the consumption to reach the {@code offset} of
the given record
+ * at a certain {@code partition} of the metadata topic.
*
* @param recordMetadata record metadata to be checked for consumption.
* @throws TimeoutException if this method execution did not complete with
in the wait time configured with
@@ -87,10 +96,10 @@ public class ConsumerManager implements Closeable {
}
/**
- * Waits if necessary for the consumption to reach the offset of the given
{@code recordMetadata}.
+ * Waits if necessary for the consumption to reach the partition/offset of
the given {@code RecordMetadata}
*
* @param recordMetadata record metadata to be checked for consumption.
- * @param timeoutMs wait timeout in milli seconds
+ * @param timeoutMs wait timeout in milliseconds
* @throws TimeoutException if this method execution did not complete with
in the given {@code timeoutMs}.
*/
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
@@ -98,25 +107,29 @@ public class ConsumerManager implements Closeable {
final int partition = recordMetadata.partition();
final long consumeCheckIntervalMs =
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
+ log.info("Waiting until consumer is caught up with the target
partition: [{}]", partition);
+
// If the current assignment does not have the subscription for this
partition then return immediately.
if (!consumerTask.isPartitionAssigned(partition)) {
- throw new KafkaException("This consumer is not subscribed to the
target partition " + partition + " on which message is produced.");
+ throw new KafkaException("This consumer is not assigned to the
target partition " + partition + ". " +
+ "Partitions currently assigned: " +
consumerTask.metadataPartitionsAssigned());
}
final long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
while (true) {
+ log.debug("Checking if partition [{}] is up to date with offset
[{}]", partition, offset);
long receivedOffset =
consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
if (receivedOffset >= offset) {
return;
}
- log.debug("Committed offset [{}] for partition [{}], but the
target offset: [{}], Sleeping for [{}] to retry again",
- offset, partition, receivedOffset,
consumeCheckIntervalMs);
+ log.debug("Expected offset [{}] for partition [{}], but the
committed offset: [{}], Sleeping for [{}] to retry again",
+ offset, partition, receivedOffset, consumeCheckIntervalMs);
if (time.milliseconds() - startTimeMs > timeoutMs) {
- log.warn("Committed offset for partition:[{}] is : [{}], but
the target offset: [{}] ",
- partition, receivedOffset, offset);
+ log.warn("Expected offset for partition:[{}] is : [{}], but
the committed offset: [{}] ",
+ partition, receivedOffset, offset);
throw new TimeoutException("Timed out in catching up with the
expected offset by consumer.");
}
@@ -126,7 +139,7 @@ public class ConsumerManager implements Closeable {
@Override
public void close() throws IOException {
- // Consumer task will close the task and it internally closes all the
resources including the consumer.
+ // Consumer task will close the task, and it internally closes all the
resources including the consumer.
Utils.closeQuietly(consumerTask, "ConsumerTask");
// Wait until the consumer thread finishes.
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 8e0b52d71dd..2c95bf399a5 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -68,12 +68,13 @@ class ConsumerTask implements Runnable, Closeable {
private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
private final KafkaConsumer<byte[], byte[]> consumer;
+ private final String metadataTopicName;
private final RemotePartitionMetadataEventHandler
remotePartitionMetadataEventHandler;
private final RemoteLogMetadataTopicPartitioner topicPartitioner;
private final Time time;
// It indicates whether the closing process has been started or not. If it
is set as true,
- // consumer will stop consuming messages and it will not allow partition
assignments to be updated.
+ // consumer will stop consuming messages, and it will not allow partition
assignments to be updated.
private volatile boolean closing = false;
// It indicates whether the consumer needs to assign the partitions or
not. This is set when it is
@@ -101,12 +102,14 @@ class ConsumerTask implements Runnable, Closeable {
private long lastSyncedTimeMs;
public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+ String metadataTopicName,
RemotePartitionMetadataEventHandler
remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Path committedOffsetsPath,
Time time,
long committedOffsetSyncIntervalMs) {
this.consumer = Objects.requireNonNull(consumer);
+ this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
this.remotePartitionMetadataEventHandler =
Objects.requireNonNull(remotePartitionMetadataEventHandler);
this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
this.time = Objects.requireNonNull(time);
@@ -143,6 +146,7 @@ class ConsumerTask implements Runnable, Closeable {
// Seek to the committed offsets
for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet())
{
+ log.debug("Updating consumed offset: [{}] for partition [{}]",
entry.getValue(), entry.getKey());
partitionToConsumedOffsets.put(entry.getKey(),
entry.getValue());
consumer.seek(new
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()),
entry.getValue());
}
@@ -187,6 +191,7 @@ class ConsumerTask implements Runnable, Closeable {
} else {
log.debug("This event {} is skipped as the topic partition is
not assigned for this instance.", remoteLogMetadata);
}
+ log.debug("Updating consumed offset: [{}] for partition [{}]",
record.offset(), record.partition());
partitionToConsumedOffsets.put(record.partition(),
record.offset());
}
}
@@ -209,7 +214,7 @@ class ConsumerTask implements Runnable, Closeable {
if (offset != null) {
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition,
metadataPartition, offset);
} else {
- log.debug("Skipping syncup of the
remote-log-metadata-file for partition:{} , with remote log metadata
partition{}, and no offset",
+ log.debug("Skipping sync-up of the
remote-log-metadata-file for partition: [{}] , with remote log metadata
partition{}, and no offset",
topicIdPartition, metadataPartition);
}
}
@@ -313,7 +318,7 @@ class ConsumerTask implements Runnable, Closeable {
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
}
- // Clear removed topic partitions from inmemory cache.
+ // Clear removed topic partitions from in-memory cache.
for (TopicIdPartition removedPartition : removedPartitions) {
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
}
@@ -353,4 +358,8 @@ class ConsumerTask implements Runnable, Closeable {
}
}
}
+
+ public Set<Integer> metadataPartitionsAssigned() {
+ return Collections.unmodifiableSet(assignedMetaPartitions);
+ }
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 0271780174b..ffd6e145039 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -169,7 +168,8 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
*
* @param topicIdPartition partition of the given remoteLogMetadata.
* @param remoteLogMetadata RemoteLogMetadata to be stored.
- * @return
+ * @return a future with acknowledge and potentially waiting also for
consumer to catch up.
+ * This ensures cache is synchronized with backing topic.
* @throws RemoteStorageException if there are any storage errors occur.
*/
private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition
topicIdPartition,
@@ -182,13 +182,12 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
CompletableFuture<RecordMetadata> produceFuture =
producerManager.publishMessage(remoteLogMetadata);
// Create and return a `CompletableFuture` instance which
completes when the consumer is caught up with the produced record's offset.
- return produceFuture.thenApplyAsync(recordMetadata -> {
+ return produceFuture.thenAcceptAsync(recordMetadata -> {
try {
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
} catch (TimeoutException e) {
throw new KafkaException(e);
}
- return null;
});
} catch (KafkaException e) {
if (e instanceof RetriableException) {
@@ -338,18 +337,18 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
return;
}
- log.info("Started initializing with configs: {}", configs);
+ log.info("Started configuring topic-based RLMM with configs: {}",
configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmmTopicPartitioner = new
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
configured = true;
- log.info("Successfully initialized with rlmmConfig: {}",
rlmmConfig);
+ log.info("Successfully configured topic-based RLMM with config:
{}", rlmmConfig);
// Scheduling the initialization producer/consumer managers in a
separate thread. Required resources may
// not yet be available now. This thread makes sure that it is
retried at regular intervals until it is
// successful.
- initializationThread =
KafkaThread.nonDaemon("RLMMInitializationThread", () -> initializeResources());
+ initializationThread =
KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
initializationThread.start();
} finally {
lock.writeLock().unlock();
@@ -357,14 +356,11 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
}
private void initializeResources() {
- log.info("Initializing the resources.");
+ log.info("Initializing topic-based RLMM resources");
final NewTopic remoteLogMetadataTopicRequest =
createRemoteLogMetadataTopicRequest();
boolean topicCreated = false;
long startTimeMs = time.milliseconds();
- AdminClient adminClient = null;
- try {
- adminClient = AdminClient.create(rlmmConfig.producerProperties());
-
+ try (AdminClient adminClient =
AdminClient.create(rlmmConfig.commonProperties())) {
// Stop if it is already initialized or closing.
while (!(initialized.get() || closing.get())) {
@@ -417,7 +413,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
initialized.set(true);
- log.info("Initialized resources successfully.");
+ log.info("Initialized topic-based RLMM resources
successfully");
} catch (Exception e) {
log.error("Encountered error while initializing
producer/consumer", e);
return;
@@ -425,16 +421,6 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
lock.writeLock().unlock();
}
}
-
- } finally {
- if (adminClient != null) {
- try {
- adminClient.close(Duration.ofSeconds(10));
- } catch (Exception e) {
- // Ignore the error.
- log.debug("Error occurred while closing the admin client",
e);
- }
- }
}
}
@@ -515,7 +501,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
@Override
public void close() throws IOException {
// Close all the resources.
- log.info("Closing the resources.");
+ log.info("Closing topic-based RLMM resources");
if (closing.compareAndSet(false, true)) {
lock.writeLock().lock();
try {
@@ -532,7 +518,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
Utils.closeQuietly(remotePartitionMetadataStore,
"RemotePartitionMetadataStore");
} finally {
lock.writeLock().unlock();
- log.info("Closed the resources.");
+ log.info("Closed topic-based RLMM resources");
}
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 7e52519f2eb..1ab57f5b8d9 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -105,6 +105,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
private final long initializationRetryMaxTimeoutMs;
private final long initializationRetryIntervalMs;
+ private Map<String, Object> commonProps;
private Map<String, Object> consumerProps;
private Map<String, Object> producerProps;
@@ -149,6 +150,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
}
}
+ commonProps = new HashMap<>(commonClientConfigs);
+
HashMap<String, Object> allProducerConfigs = new
HashMap<>(commonClientConfigs);
allProducerConfigs.putAll(producerOnlyConfigs);
producerProps = createProducerProps(allProducerConfigs);
@@ -190,6 +193,10 @@ public final class
TopicBasedRemoteLogMetadataManagerConfig {
return logDir;
}
+ public Map<String, Object> commonProperties() {
+ return commonProps;
+ }
+
public Map<String, Object> consumerProperties() {
return consumerProps;
}
@@ -232,6 +239,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig
{
", metadataTopicReplicationFactor=" +
metadataTopicReplicationFactor +
", initializationRetryMaxTimeoutMs=" +
initializationRetryMaxTimeoutMs +
", initializationRetryIntervalMs=" +
initializationRetryIntervalMs +
+ ", commonProps=" + commonProps +
", consumerProps=" + consumerProps +
", producerProps=" + producerProps +
'}';
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b8af14e319e..b847e7cba3f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -271,11 +271,13 @@ public class RemoteLogSegmentLifecycleTest {
throws RemoteStorageException {
// cache.listRemoteLogSegments(leaderEpoch) should contain the above
segment.
Iterator<RemoteLogSegmentMetadata> segmentsIter =
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
- Assertions.assertTrue(segmentsIter.hasNext() &&
Objects.equals(segmentsIter.next(), expectedSegment));
+ Assertions.assertTrue(segmentsIter.hasNext());
+ Assertions.assertEquals(expectedSegment, segmentsIter.next());
// cache.listAllRemoteLogSegments() should contain the above segment.
Iterator<RemoteLogSegmentMetadata> allSegmentsIter =
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
- Assertions.assertTrue(allSegmentsIter.hasNext() &&
Objects.equals(allSegmentsIter.next(), expectedSegment));
+ Assertions.assertTrue(allSegmentsIter.hasNext());
+ Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
}
@ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@@ -285,7 +287,7 @@ public class RemoteLogSegmentLifecycleTest {
try {
remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
- // Create a segment with state COPY_SEGMENT_STARTED, and check
for searching that segment and listing the
+ // Create a segment with state COPY_SEGMENT_STARTED, and check for
searching that segment and listing the
// segments.
RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
RemoteLogSegmentMetadata segmentMetadata = new
RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index f66253b4628..8e3985d0d5f 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -39,13 +39,12 @@ import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
public class TopicBasedRemoteLogMetadataManagerConfigTest {
- private static final Logger log =
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
+ private static final Logger log =
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
private static final String BOOTSTRAP_SERVERS = "localhost:9091";
@Test
public void testValidConfig() {
-
Map<String, Object> commonClientConfig = new HashMap<>();
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
1000L);
@@ -64,11 +63,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
rlmmConfig.metadataTopicPartitionsCount());
// Check for common client configs.
+ Assertions.assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
log.info("Checking config: " + entry.getKey());
+ Assertions.assertEquals(entry.getValue(),
+
rlmmConfig.commonProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
rlmmConfig.producerProperties().get(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
@@ -91,12 +93,13 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
}
@Test
- public void testProducerConsumerOverridesConfig() {
+ public void testCommonProducerConsumerOverridesConfig() {
Map.Entry<String, Long> overrideEntry = new
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
60000L);
Map<String, Object> commonClientConfig = new HashMap<>();
commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
1000L);
- commonClientConfig.put(overrideEntry.getKey(),
overrideEntry.getValue());
+ Long overrideCommonPropValue = overrideEntry.getValue();
+ commonClientConfig.put(overrideEntry.getKey(),
overrideCommonPropValue);
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.ACKS_CONFIG, -1);
@@ -111,6 +114,8 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Map<String, Object> props = createValidConfigProps(commonClientConfig,
producerConfig, consumerConfig);
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
+ Assertions.assertEquals(overrideCommonPropValue,
+
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
Assertions.assertEquals(overriddenProducerPropValue,
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
Assertions.assertEquals(overriddenConsumerPropValue,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 3dd02962de5..714405a30e9 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -120,7 +120,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
// Register these partitions to RLMM.
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
Collections.singleton(followerTopicIdPartition));
- // Add segments for these partitions but they are not available as
they have not yet been subscribed.
+ // Add segments for these partitions, but they are not available as
they have not yet been subscribed.
RemoteLogSegmentMetadata leaderSegmentMetadata = new
RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition,
Uuid.randomUuid()),
0, 100, -1L, 0,
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 20bad28d794..a41a9a38699 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -124,15 +124,15 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// RemoteLogSegmentMetadata events are already published, and
topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
- waitUntilConsumerCatchesup(newLeaderTopicIdPartition,
newFollowerTopicIdPartition, 30_000L);
+ waitUntilConsumerCatchesUp(newLeaderTopicIdPartition,
newFollowerTopicIdPartition, 30_000L);
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
}
- private void waitUntilConsumerCatchesup(TopicIdPartition
newLeaderTopicIdPartition,
- TopicIdPartition
newFollowerTopicIdPartition,
- long timeoutMs) throws
TimeoutException {
+ private void waitUntilConsumerCatchesUp(TopicIdPartition
newLeaderTopicIdPartition,
+ TopicIdPartition
newFollowerTopicIdPartition,
+ long timeoutMs) throws
TimeoutException {
int leaderMetadataPartition =
topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
int followerMetadataPartition =
topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);