fantapsody commented on a change in pull request #10600:
URL: https://github.com/apache/pulsar/pull/10600#discussion_r633215384
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
##########
@@ -308,6 +324,32 @@ static void createTenantIfAbsent(ZooKeeper configStoreZk,
String tenant, String
}
}
+ static void createTenantIfAbsent(MetadataStore configStore, String tenant,
String cluster) throws IOException,
Review comment:
As @BewareMyPower pointed out, these methods are still used in
`PulsarInitialNamespaceSetup` and `PulsarTransactionCoordinatorMetadataSetup`.
I plan to open more PRs to fix these commands as well and then remove these
methods altogether.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
##########
@@ -338,6 +380,32 @@ static void createNamespaceIfAbsent(ZooKeeper
configStoreZk, NamespaceName names
}
}
+ static void createNamespaceIfAbsent(MetadataStore configStore,
NamespaceName namespaceName, String cluster)
Review comment:
Yes, as I explained above.
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -360,4 +368,35 @@ private static CreateMode
getCreateMode(EnumSet<CreateOption> options) {
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0,
chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully",
chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
Review comment:
Understood. It seems
[`MetadataStore`](https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java#L53)
APIs are following a different pattern: all IO operations are asynchronous by
default. So I think `MetadataStoreLifecycle#initializeCluster` should be
designed in the same way.
##########
File path:
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
##########
@@ -360,4 +368,35 @@ private static CreateMode
getCreateMode(EnumSet<CreateOption> options) {
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new
MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0,
chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully",
chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
Review comment:
It's true. The synchronous implementation is because I want to keep this
piece of code simple and reuse the previous code, and the asynchronous API
definition is to leave room for future optimizations if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]