This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ae1d228796 KAFKA-15135: fix(storage): pass endpoint configurations as 
client common to TBRLMM (#13938)
0ae1d228796 is described below

commit 0ae1d228796e100f371cd1f20d826547b6231113
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Sun Jul 2 20:16:15 2023 -0500

    KAFKA-15135: fix(storage): pass endpoint configurations as client common to 
TBRLMM (#13938)
    
    Pass endpoint properties from RLM to TBRLMM and validate those are not 
ignored.
    
    Reviewers: Luke Chen <[email protected]>
---
 core/src/main/java/kafka/log/remote/RemoteLogManager.java           | 6 ++++--
 core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java       | 5 +++--
 .../storage/TopicBasedRemoteLogMetadataManagerConfigTest.java       | 5 ++++-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 160c82b0981..cb58414ab1d 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -105,6 +105,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+
 /**
  * This class is responsible for
  * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
@@ -237,8 +239,8 @@ public class RemoteLogManager implements Closeable {
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
         rlmmProps.put("cluster.id", clusterId);
         endpoint.ifPresent(e -> {
-            rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
-            rlmmProps.put("security.protocol", e.securityProtocol().name);
+            rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"bootstrap.servers", e.host() + ":" + e.port());
+            rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"security.protocol", e.securityProtocol().name);
         });
 
         remoteLogMetadataManager.configure(rlmmProps);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 7d534c89a7d..f7a6f1c4a9e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -84,6 +84,7 @@ import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -220,8 +221,8 @@ public class RemoteLogManagerTest {
 
         ArgumentCaptor<Map<String, Object>> capture = 
ArgumentCaptor.forClass(Map.class);
         verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
-        assertEquals(host + ":" + port, 
capture.getValue().get("bootstrap.servers"));
-        assertEquals(securityProtocol, 
capture.getValue().get("security.protocol"));
+        assertEquals(host + ":" + port, 
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"bootstrap.servers"));
+        assertEquals(securityProtocol, 
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"security.protocol"));
         assertEquals(clusterId, capture.getValue().get("cluster.id"));
         assertEquals(brokerId, 
capture.getValue().get(KafkaConfig.BrokerIdProp()));
     }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 3785c8d9b7d..f66253b4628 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -64,6 +64,9 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
         
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP), 
rlmmConfig.metadataTopicPartitionsCount());
 
         // Check for common client configs.
+        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+        Assertions.assertEquals(BOOTSTRAP_SERVERS, 
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+
         for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
             log.info("Checking config: " + entry.getKey());
             Assertions.assertEquals(entry.getValue(),
@@ -118,7 +121,7 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
                                                        Map<String, Object> 
producerConfig,
                                                        Map<String, Object> 
consumerConfig) {
         Map<String, Object> props = new HashMap<>();
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
BOOTSTRAP_SERVERS);
+        props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
         props.put(BROKER_ID, 1);
         props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());
 

Reply via email to