BewareMyPower commented on a change in pull request #10600:
URL: https://github.com/apache/pulsar/pull/10600#discussion_r633199881
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -360,4 +368,35 @@ private static CreateMode
getCreateMode(EnumSet<CreateOption> options) {
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0,
chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully",
chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
Review comment:
IMO there's no need to return a future because it's not an asynchronous
API.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
##########
@@ -338,6 +380,32 @@ static void createNamespaceIfAbsent(ZooKeeper
configStoreZk, NamespaceName names
}
}
+ static void createNamespaceIfAbsent(MetadataStore configStore,
NamespaceName namespaceName, String cluster)
Review comment:
It's referenced in `PulsarInitialNamespaceSetup`, I think it's better to
open a new PR for setup namespace with MetadataStore API.
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -360,4 +368,35 @@ private static CreateMode
getCreateMode(EnumSet<CreateOption> options) {
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0,
chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully",
chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
Review comment:
I mean `MetadataStoreLifecycle#initializeCluster` should not be an
asynchronous API. Otherwise it should be renamed to `initializeClusterAsync`
and the associated synchronous API `initializeCluster` needs to be added as
well.
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -360,4 +368,35 @@ private static CreateMode
getCreateMode(EnumSet<CreateOption> options) {
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0,
chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully",
chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
Review comment:
I agree with @eolivelli that if you're going to design an asynchronous
API, it's better keep using CompletableFuture. I suggested using a synchronous
API here just because your implementation is a synchronous way @fantapsody
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]