This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ae1d228796 KAFKA-15135: fix(storage): pass endpoint configurations as
client common to TBRLMM (#13938)
0ae1d228796 is described below
commit 0ae1d228796e100f371cd1f20d826547b6231113
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Sun Jul 2 20:16:15 2023 -0500
KAFKA-15135: fix(storage): pass endpoint configurations as client common to
TBRLMM (#13938)
Pass endpoint properties from RLM to TBRLMM and validate those are not
ignored.
Reviewers: Luke Chen <[email protected]>
---
core/src/main/java/kafka/log/remote/RemoteLogManager.java | 6 ++++--
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java | 5 +++--
.../storage/TopicBasedRemoteLogMetadataManagerConfigTest.java | 5 ++++-
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 160c82b0981..cb58414ab1d 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -105,6 +105,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+
/**
* This class is responsible for
* - initializing `RemoteStorageManager` and `RemoteLogMetadataManager`
instances
@@ -237,8 +239,8 @@ public class RemoteLogManager implements Closeable {
rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
rlmmProps.put("cluster.id", clusterId);
endpoint.ifPresent(e -> {
- rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
- rlmmProps.put("security.protocol", e.securityProtocol().name);
+ rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"bootstrap.servers", e.host() + ":" + e.port());
+ rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"security.protocol", e.securityProtocol().name);
});
remoteLogMetadataManager.configure(rlmmProps);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 7d534c89a7d..f7a6f1c4a9e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -84,6 +84,7 @@ import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -220,8 +221,8 @@ public class RemoteLogManagerTest {
ArgumentCaptor<Map<String, Object>> capture =
ArgumentCaptor.forClass(Map.class);
verify(remoteLogMetadataManager,
times(1)).configure(capture.capture());
- assertEquals(host + ":" + port,
capture.getValue().get("bootstrap.servers"));
- assertEquals(securityProtocol,
capture.getValue().get("security.protocol"));
+ assertEquals(host + ":" + port,
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"bootstrap.servers"));
+ assertEquals(securityProtocol,
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"security.protocol"));
assertEquals(clusterId, capture.getValue().get("cluster.id"));
assertEquals(brokerId,
capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 3785c8d9b7d..f66253b4628 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -64,6 +64,9 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Assertions.assertEquals(props.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
rlmmConfig.metadataTopicPartitionsCount());
// Check for common client configs.
+ Assertions.assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.producerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+ Assertions.assertEquals(BOOTSTRAP_SERVERS,
rlmmConfig.consumerProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+
for (Map.Entry<String, Object> entry : commonClientConfig.entrySet()) {
log.info("Checking config: " + entry.getKey());
Assertions.assertEquals(entry.getValue(),
@@ -118,7 +121,7 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
Map<String, Object>
producerConfig,
Map<String, Object>
consumerConfig) {
Map<String, Object> props = new HashMap<>();
- props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
+ props.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(BROKER_ID, 1);
props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());