This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc4e699d4cb MINOR: Minor logging and doc related improvements in 
topic-based RLMM consumer-manager/task (#14045)
cc4e699d4cb is described below

commit cc4e699d4cb2880d05603e2e8310d28e1c9f201a
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Sat Jul 22 07:33:35 2023 +0300

    MINOR: Minor logging and doc related improvements in topic-based RLMM 
consumer-manager/task (#14045)
    
    Improved logging and docs on consumer manager/task call paths.
    
    Reviewers: Luke Chen <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../remote/metadata/storage/ConsumerManager.java   | 33 ++++++++++++++------
 .../log/remote/metadata/storage/ConsumerTask.java  | 15 +++++++--
 .../TopicBasedRemoteLogMetadataManager.java        | 36 +++++++---------------
 .../TopicBasedRemoteLogMetadataManagerConfig.java  |  8 +++++
 .../storage/RemoteLogSegmentLifecycleTest.java     |  8 +++--
 ...picBasedRemoteLogMetadataManagerConfigTest.java | 13 +++++---
 ...icBasedRemoteLogMetadataManagerRestartTest.java |  2 +-
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  8 ++---
 8 files changed, 73 insertions(+), 50 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 77f83fb90b7..14ec707a2eb 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -61,7 +61,15 @@ public class ConsumerManager implements Closeable {
         //Create a task to consume messages and submit the respective events 
to RemotePartitionMetadataEventHandler.
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(rlmmConfig.consumerProperties());
         Path committedOffsetsPath = new File(rlmmConfig.logDir(), 
COMMITTED_OFFSETS_FILE_NAME).toPath();
-        consumerTask = new ConsumerTask(consumer, 
remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, 
time, 60_000L);
+        consumerTask = new ConsumerTask(
+                consumer,
+                rlmmConfig.remoteLogMetadataTopicName(),
+                remotePartitionMetadataEventHandler,
+                topicPartitioner,
+                committedOffsetsPath,
+                time,
+                60_000L
+        );
         consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", 
consumerTask);
     }
 
@@ -76,7 +84,8 @@ public class ConsumerManager implements Closeable {
     }
 
     /**
-     * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
+     * Waits if necessary for the consumption to reach the {@code offset} of 
the given record
+     * at a certain {@code partition} of the metadata topic.
      *
      * @param recordMetadata record metadata to be checked for consumption.
      * @throws TimeoutException if this method execution did not complete with 
in the wait time configured with
@@ -87,10 +96,10 @@ public class ConsumerManager implements Closeable {
     }
 
     /**
-     * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
+     * Waits if necessary for the consumption to reach the partition/offset of 
the given {@code RecordMetadata}
      *
      * @param recordMetadata record metadata to be checked for consumption.
-     * @param timeoutMs      wait timeout in milli seconds
+     * @param timeoutMs      wait timeout in milliseconds
      * @throws TimeoutException if this method execution did not complete with 
in the given {@code timeoutMs}.
      */
     public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
@@ -98,25 +107,29 @@ public class ConsumerManager implements Closeable {
         final int partition = recordMetadata.partition();
         final long consumeCheckIntervalMs = 
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
 
+        log.info("Waiting until consumer is caught up with the target 
partition: [{}]", partition);
+
         // If the current assignment does not have the subscription for this 
partition then return immediately.
         if (!consumerTask.isPartitionAssigned(partition)) {
-            throw new KafkaException("This consumer is not subscribed to the 
target partition " + partition + " on which message is produced.");
+            throw new KafkaException("This consumer is not assigned to the 
target partition " + partition + ". " +
+                    "Partitions currently assigned: " + 
consumerTask.metadataPartitionsAssigned());
         }
 
         final long offset = recordMetadata.offset();
         long startTimeMs = time.milliseconds();
         while (true) {
+            log.debug("Checking if partition [{}] is up to date with offset 
[{}]", partition, offset);
             long receivedOffset = 
consumerTask.receivedOffsetForPartition(partition).orElse(-1L);
             if (receivedOffset >= offset) {
                 return;
             }
 
-            log.debug("Committed offset [{}] for partition [{}], but the 
target offset: [{}],  Sleeping for [{}] to retry again",
-                      offset, partition, receivedOffset, 
consumeCheckIntervalMs);
+            log.debug("Expected offset [{}] for partition [{}], but the 
committed offset: [{}],  Sleeping for [{}] to retry again",
+                    offset, partition, receivedOffset, consumeCheckIntervalMs);
 
             if (time.milliseconds() - startTimeMs > timeoutMs) {
-                log.warn("Committed offset for partition:[{}] is : [{}], but 
the target offset: [{}] ",
-                         partition, receivedOffset, offset);
+                log.warn("Expected offset for partition:[{}] is : [{}], but 
the committed offset: [{}] ",
+                        partition, receivedOffset, offset);
                 throw new TimeoutException("Timed out in catching up with the 
expected offset by consumer.");
             }
 
@@ -126,7 +139,7 @@ public class ConsumerManager implements Closeable {
 
     @Override
     public void close() throws IOException {
-        // Consumer task will close the task and it internally closes all the 
resources including the consumer.
+        // Consumer task will close the task, and it internally closes all the 
resources including the consumer.
         Utils.closeQuietly(consumerTask, "ConsumerTask");
 
         // Wait until the consumer thread finishes.
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 8e0b52d71dd..2c95bf399a5 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -68,12 +68,13 @@ class ConsumerTask implements Runnable, Closeable {
 
     private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
     private final KafkaConsumer<byte[], byte[]> consumer;
+    private final String metadataTopicName;
     private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
     private final Time time;
 
     // It indicates whether the closing process has been started or not. If it 
is set as true,
-    // consumer will stop consuming messages and it will not allow partition 
assignments to be updated.
+    // consumer will stop consuming messages, and it will not allow partition 
assignments to be updated.
     private volatile boolean closing = false;
 
     // It indicates whether the consumer needs to assign the partitions or 
not. This is set when it is
@@ -101,12 +102,14 @@ class ConsumerTask implements Runnable, Closeable {
     private long lastSyncedTimeMs;
 
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        String metadataTopicName,
                         RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
                         RemoteLogMetadataTopicPartitioner topicPartitioner,
                         Path committedOffsetsPath,
                         Time time,
                         long committedOffsetSyncIntervalMs) {
         this.consumer = Objects.requireNonNull(consumer);
+        this.metadataTopicName = Objects.requireNonNull(metadataTopicName);
         this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
         this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
         this.time = Objects.requireNonNull(time);
@@ -143,6 +146,7 @@ class ConsumerTask implements Runnable, Closeable {
 
             // Seek to the committed offsets
             for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) 
{
+                log.debug("Updating consumed offset: [{}] for partition [{}]", 
entry.getValue(), entry.getKey());
                 partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
                 consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
             }
@@ -187,6 +191,7 @@ class ConsumerTask implements Runnable, Closeable {
             } else {
                 log.debug("This event {} is skipped as the topic partition is 
not assigned for this instance.", remoteLogMetadata);
             }
+            log.debug("Updating consumed offset: [{}] for partition [{}]", 
record.offset(), record.partition());
             partitionToConsumedOffsets.put(record.partition(), 
record.offset());
         }
     }
@@ -209,7 +214,7 @@ class ConsumerTask implements Runnable, Closeable {
                     if (offset != null) {
                         
remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, 
metadataPartition, offset);
                     } else {
-                        log.debug("Skipping syncup of the 
remote-log-metadata-file for partition:{} , with remote log metadata 
partition{}, and no offset",
+                        log.debug("Skipping sync-up of the 
remote-log-metadata-file for partition: [{}] , with remote log metadata 
partition{}, and no offset",
                                 topicIdPartition, metadataPartition);
                     }
                 }
@@ -313,7 +318,7 @@ class ConsumerTask implements Runnable, Closeable {
                 
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
             }
 
-            // Clear removed topic partitions from inmemory cache.
+            // Clear removed topic partitions from in-memory cache.
             for (TopicIdPartition removedPartition : removedPartitions) {
                 
remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
             }
@@ -353,4 +358,8 @@ class ConsumerTask implements Runnable, Closeable {
             }
         }
     }
+
+    public Set<Integer> metadataPartitionsAssigned() {
+        return Collections.unmodifiableSet(assignedMetaPartitions);
+    }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 0271780174b..ffd6e145039 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -169,7 +168,8 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
      *
      * @param topicIdPartition partition of the given remoteLogMetadata.
      * @param remoteLogMetadata RemoteLogMetadata to be stored.
-     * @return
+     * @return a future with acknowledge and potentially waiting also for 
consumer to catch up.
+     * This ensures cache is synchronized with backing topic.
      * @throws RemoteStorageException if there are any storage errors occur.
      */
     private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition 
topicIdPartition,
@@ -182,13 +182,12 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
             CompletableFuture<RecordMetadata> produceFuture = 
producerManager.publishMessage(remoteLogMetadata);
 
             // Create and return a `CompletableFuture` instance which 
completes when the consumer is caught up with the produced record's offset.
-            return produceFuture.thenApplyAsync(recordMetadata -> {
+            return produceFuture.thenAcceptAsync(recordMetadata -> {
                 try {
                     
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
                 } catch (TimeoutException e) {
                     throw new KafkaException(e);
                 }
-                return null;
             });
         } catch (KafkaException e) {
             if (e instanceof RetriableException) {
@@ -338,18 +337,18 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
                 return;
             }
 
-            log.info("Started initializing with configs: {}", configs);
+            log.info("Started configuring topic-based RLMM with configs: {}", 
configs);
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
             rlmmTopicPartitioner = new 
RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
             remotePartitionMetadataStore = new 
RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
             configured = true;
-            log.info("Successfully initialized with rlmmConfig: {}", 
rlmmConfig);
+            log.info("Successfully configured topic-based RLMM with config: 
{}", rlmmConfig);
 
             // Scheduling the initialization producer/consumer managers in a 
separate thread. Required resources may
             // not yet be available now. This thread makes sure that it is 
retried at regular intervals until it is
             // successful.
-            initializationThread = 
KafkaThread.nonDaemon("RLMMInitializationThread", () -> initializeResources());
+            initializationThread = 
KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
             initializationThread.start();
         } finally {
             lock.writeLock().unlock();
@@ -357,14 +356,11 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
     }
 
     private void initializeResources() {
-        log.info("Initializing the resources.");
+        log.info("Initializing topic-based RLMM resources");
         final NewTopic remoteLogMetadataTopicRequest = 
createRemoteLogMetadataTopicRequest();
         boolean topicCreated = false;
         long startTimeMs = time.milliseconds();
-        AdminClient adminClient = null;
-        try {
-            adminClient = AdminClient.create(rlmmConfig.producerProperties());
-
+        try (AdminClient adminClient = 
AdminClient.create(rlmmConfig.commonProperties())) {
             // Stop if it is already initialized or closing.
             while (!(initialized.get() || closing.get())) {
 
@@ -417,7 +413,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
                     }
 
                     initialized.set(true);
-                    log.info("Initialized resources successfully.");
+                    log.info("Initialized topic-based RLMM resources 
successfully");
                 } catch (Exception e) {
                     log.error("Encountered error while initializing 
producer/consumer", e);
                     return;
@@ -425,16 +421,6 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
                     lock.writeLock().unlock();
                 }
             }
-
-        } finally {
-            if (adminClient != null) {
-                try {
-                    adminClient.close(Duration.ofSeconds(10));
-                } catch (Exception e) {
-                    // Ignore the error.
-                    log.debug("Error occurred while closing the admin client", 
e);
-                }
-            }
         }
     }
 
@@ -515,7 +501,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
     @Override
     public void close() throws IOException {
         // Close all the resources.
-        log.info("Closing the resources.");
+        log.info("Closing topic-based RLMM resources");
         if (closing.compareAndSet(false, true)) {
             lock.writeLock().lock();
             try {
@@ -532,7 +518,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
                 Utils.closeQuietly(remotePartitionMetadataStore, 
"RemotePartitionMetadataStore");
             } finally {
                 lock.writeLock().unlock();
-                log.info("Closed the resources.");
+                log.info("Closed topic-based RLMM resources");
             }
         }
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 7e52519f2eb..1ab57f5b8d9 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -105,6 +105,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
     private final long initializationRetryMaxTimeoutMs;
     private final long initializationRetryIntervalMs;
 
+    private Map<String, Object> commonProps;
     private Map<String, Object> consumerProps;
     private Map<String, Object> producerProps;
 
@@ -149,6 +150,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
             }
         }
 
+        commonProps = new HashMap<>(commonClientConfigs);
+
         HashMap<String, Object> allProducerConfigs = new 
HashMap<>(commonClientConfigs);
         allProducerConfigs.putAll(producerOnlyConfigs);
         producerProps = createProducerProps(allProducerConfigs);
@@ -190,6 +193,10 @@ public final class 
TopicBasedRemoteLogMetadataManagerConfig {
         return logDir;
     }
 
+    public Map<String, Object> commonProperties() {
+        return commonProps;
+    }
+
     public Map<String, Object> consumerProperties() {
         return consumerProps;
     }
@@ -232,6 +239,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig 
{
                 ", metadataTopicReplicationFactor=" + 
metadataTopicReplicationFactor +
                 ", initializationRetryMaxTimeoutMs=" + 
initializationRetryMaxTimeoutMs +
                 ", initializationRetryIntervalMs=" + 
initializationRetryIntervalMs +
+                ", commonProps=" + commonProps +
                 ", consumerProps=" + consumerProps +
                 ", producerProps=" + producerProps +
                 '}';
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b8af14e319e..b847e7cba3f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -271,11 +271,13 @@ public class RemoteLogSegmentLifecycleTest {
             throws RemoteStorageException {
         // cache.listRemoteLogSegments(leaderEpoch) should contain the above 
segment.
         Iterator<RemoteLogSegmentMetadata> segmentsIter = 
remoteLogSegmentLifecycleManager.listRemoteLogSegments(leaderEpoch);
-        Assertions.assertTrue(segmentsIter.hasNext() && 
Objects.equals(segmentsIter.next(), expectedSegment));
+        Assertions.assertTrue(segmentsIter.hasNext());
+        Assertions.assertEquals(expectedSegment, segmentsIter.next());
 
         // cache.listAllRemoteLogSegments() should contain the above segment.
         Iterator<RemoteLogSegmentMetadata> allSegmentsIter = 
remoteLogSegmentLifecycleManager.listAllRemoteLogSegments();
-        Assertions.assertTrue(allSegmentsIter.hasNext() && 
Objects.equals(allSegmentsIter.next(), expectedSegment));
+        Assertions.assertTrue(allSegmentsIter.hasNext());
+        Assertions.assertEquals(expectedSegment, allSegmentsIter.next());
     }
 
     @ParameterizedTest(name = "remoteLogSegmentLifecycleManager = {0}")
@@ -285,7 +287,7 @@ public class RemoteLogSegmentLifecycleTest {
         try {
             remoteLogSegmentLifecycleManager.initialize(topicIdPartition);
 
-                // Create a segment with state COPY_SEGMENT_STARTED, and check 
for searching that segment and listing the
+            // Create a segment with state COPY_SEGMENT_STARTED, and check for 
searching that segment and listing the
             // segments.
             RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
             RemoteLogSegmentMetadata segmentMetadata = new 
RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, BROKER_ID_0,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index f66253b4628..8e3985d0d5f 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -39,13 +39,12 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
 
 public class TopicBasedRemoteLogMetadataManagerConfigTest {
-    private static final  Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
+    private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfigTest.class);
 
     private static final String BOOTSTRAP_SERVERS = "localhost:9091";
 
     @Test
     public void testValidConfig() {
-
         Map<String, Object> commonClientConfig = new HashMap<>();
         commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
         commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
1000L);
@@ -64,11 +63,14 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), 
rlmmConfig.metadataTopicPartitionsCount());
 
         // Check for common client configs.
+        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.commonProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
         Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
         Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
 
         for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
             log.info("Checking config: " + entry.getKey());
+            Assertions.assertEquals(entry.getValue(),
+                                    
rlmmConfig.commonProperties().get(entry.getKey()));
             Assertions.assertEquals(entry.getValue(),
                                     
rlmmConfig.producerProperties().get(entry.getKey()));
             Assertions.assertEquals(entry.getValue(),
@@ -91,12 +93,13 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
     }
 
     @Test
-    public void testProducerConsumerOverridesConfig() {
+    public void testCommonProducerConsumerOverridesConfig() {
         Map.Entry<String, Long> overrideEntry = new 
AbstractMap.SimpleImmutableEntry<>(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 
60000L);
         Map<String, Object> commonClientConfig = new HashMap<>();
         commonClientConfig.put(CommonClientConfigs.RETRIES_CONFIG, 10);
         commonClientConfig.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 
1000L);
-        commonClientConfig.put(overrideEntry.getKey(), 
overrideEntry.getValue());
+        Long overrideCommonPropValue = overrideEntry.getValue();
+        commonClientConfig.put(overrideEntry.getKey(), 
overrideCommonPropValue);
 
         Map<String, Object> producerConfig = new HashMap<>();
         producerConfig.put(ProducerConfig.ACKS_CONFIG, -1);
@@ -111,6 +114,8 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         Map<String, Object> props = createValidConfigProps(commonClientConfig, 
producerConfig, consumerConfig);
         TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(props);
 
+        Assertions.assertEquals(overrideCommonPropValue,
+                                
rlmmConfig.commonProperties().get(overrideEntry.getKey()));
         Assertions.assertEquals(overriddenProducerPropValue,
                                 
rlmmConfig.producerProperties().get(overrideEntry.getKey()));
         Assertions.assertEquals(overriddenConsumerPropValue,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 3dd02962de5..714405a30e9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -120,7 +120,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
         // Register these partitions to RLMM.
         
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition),
 Collections.singleton(followerTopicIdPartition));
 
-        // Add segments for these partitions but they are not available as 
they have not yet been subscribed.
+        // Add segments for these partitions, but they are not available as 
they have not yet been subscribed.
         RemoteLogSegmentMetadata leaderSegmentMetadata = new 
RemoteLogSegmentMetadata(
                 new RemoteLogSegmentId(leaderTopicIdPartition, 
Uuid.randomUuid()),
                 0, 100, -1L, 0,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 20bad28d794..a41a9a38699 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -124,15 +124,15 @@ public class TopicBasedRemoteLogMetadataManagerTest {
 
         // RemoteLogSegmentMetadata events are already published, and 
topicBasedRlmm's consumer manager will start
         // fetching those events and build the cache.
-        waitUntilConsumerCatchesup(newLeaderTopicIdPartition, 
newFollowerTopicIdPartition, 30_000L);
+        waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, 
newFollowerTopicIdPartition, 30_000L);
 
         
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
         
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
     }
 
-    private void waitUntilConsumerCatchesup(TopicIdPartition 
newLeaderTopicIdPartition,
-                                          TopicIdPartition 
newFollowerTopicIdPartition,
-                                          long timeoutMs) throws 
TimeoutException {
+    private void waitUntilConsumerCatchesUp(TopicIdPartition 
newLeaderTopicIdPartition,
+                                            TopicIdPartition 
newFollowerTopicIdPartition,
+                                            long timeoutMs) throws 
TimeoutException {
         int leaderMetadataPartition = 
topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
         int followerMetadataPartition = 
topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);
 

Reply via email to