This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7c1127019 [fix][broker] Fix etcd cluster error and add test for etcd
cluster (#16309)
3d7c1127019 is described below
commit 3d7c112701932aef1f6c9a6213e313a251de9a21
Author: Lan <[email protected]>
AuthorDate: Tue Aug 2 10:46:35 2022 +0800
[fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
---
.../pulsar/metadata/impl/EtcdMetadataStore.java | 3 +-
.../metadata/impl/EtcdMetadataStoreTest.java | 58 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 0073db0c8d7..bbe7034d990 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -126,7 +126,8 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
private Client newEtcdClient(String metadataURL, MetadataStoreConfig conf)
throws IOException {
String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
- ClientBuilder clientBuilder = Client.builder().endpoints(etcdUrl);
+ ClientBuilder clientBuilder = Client.builder()
+ .endpoints(etcdUrl.split(","));
if (StringUtils.isNotEmpty(conf.getConfigFilePath())) {
try (InputStream inputStream =
Files.newInputStream(Paths.get(conf.getConfigFilePath()))) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
index 180d835830a..fd1b8ac4552 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.io.Resources;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.launcher.EtcdClusterFactory;
+
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -40,6 +41,63 @@ import org.testng.annotations.Test;
@Slf4j
public class EtcdMetadataStoreTest {
+ @Test
+ public void testCluster() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
EtcdClusterFactory.buildCluster("test-cluster", 3, false);
+ etcdCluster.start();
+
+ EtcdConfig etcdConfig = EtcdConfig.builder().useTls(false)
+ .tlsProvider(null)
+ .authority("etcd0")
+ .build();
+
+ Path etcdConfigPath = Files.createTempFile("etcd_config_cluster",
".yml");
+ new ObjectMapper(new
YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+ String metadataURL =
+ "etcd:" +
etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(metadataURL,
+
MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ assertTrue(store.exists("/test").join());
+
+ }
+
+ @Test
+ public void testClusterWithTls() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
EtcdClusterFactory.buildCluster("test-cluster", 3, true);
+ etcdCluster.start();
+
+ EtcdConfig etcdConfig = EtcdConfig.builder().useTls(true)
+ .tlsProvider(null)
+ .authority("etcd0")
+
.tlsTrustCertsFilePath(Resources.getResource("ssl/cert/ca.pem").getPath())
+
.tlsKeyFilePath(Resources.getResource("ssl/cert/client-key-pk8.pem").getPath())
+
.tlsCertificateFilePath(Resources.getResource("ssl/cert/client.pem").getPath())
+ .build();
+
+ Path etcdConfigPath = Files.createTempFile("etcd_config_cluster_ssl",
".yml");
+ new ObjectMapper(new
YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+ String metadataURL =
+ "etcd:" +
etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(metadataURL,
+
MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ assertTrue(store.exists("/test").join());
+
+ }
+
@Test
public void testTlsInstance() throws Exception {
@Cleanup