eolivelli commented on a change in pull request #10715:
URL: https://github.com/apache/pulsar/pull/10715#discussion_r642935020



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -1022,6 +1025,28 @@ public ZooKeeper getZkClient() {
         return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
     }
 
+    // get bookkeeper's zookeeper
+    public ZooKeeper getBookieZkClient() {

Review comment:
       I would name this method "createBookieZkClient" because you are actually 
creating it

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -1022,6 +1025,28 @@ public ZooKeeper getZkClient() {
         return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
     }
 
+    // get bookkeeper's zookeeper
+    public ZooKeeper getBookieZkClient() {
+        ZooKeeper zkClient = null;
+        String bookkeeperMetadataServiceUri = 
config.getBookkeeperMetadataServiceUri();
+        if (StringUtils.isBlank(bookkeeperMetadataServiceUri)) {
+            bookkeeperMetadataServiceUri = 
PulsarService.bookieMetadataServiceUri(config);
+        }
+
+        URI uri = URI.create(bookkeeperMetadataServiceUri);
+        String path = ZkUtils.trimLedgersDefaultRootPath(uri.getPath());
+        String bookieZkConnect = StringUtils.replace(uri.getAuthority(), ";", 
",") + path;
+
+        int zkTimeout = (int) config.getZooKeeperSessionTimeoutMillis();
+        try {
+            zkClient = 
ZooKeeperClient.newBuilder().connectString(bookieZkConnect)
+                    .sessionTimeoutMs(zkTimeout).build();
+        } catch (Exception e) {
+            LOG.error("Error creating bookie zookeeper client with {} for 
bookie.", bookieZkConnect, e);

Review comment:
       in this case we are returning `null` 
   should we throw an exception and fail ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
##########
@@ -224,10 +229,22 @@ public static void main(String[] args) throws Exception {
             initializer.initializeCluster(bkMetadataServiceUri.getUri(), 
arguments.numStreamStorageContainers);
         }
 
-        if 
(!localStore.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get()) {
-            createMetadataNode(localStore, 
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes());
+        // exist the existing-bk-metadata-service-uri or 
bookkeeper-metadata-service-uri,
+        // should create metadata on the bookkeeper side
+        if (existBk) {
+            URI bkUri = URI.create(uriStr);
+            String bkZKStr = StringUtils.replace(bkUri.getAuthority(), ";", 
",") + bkUri.getPath();

Review comment:
       +1

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -1022,6 +1025,28 @@ public ZooKeeper getZkClient() {
         return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
     }
 
+    // get bookkeeper's zookeeper
+    public ZooKeeper getBookieZkClient() {
+        ZooKeeper zkClient = null;
+        String bookkeeperMetadataServiceUri = 
config.getBookkeeperMetadataServiceUri();
+        if (StringUtils.isBlank(bookkeeperMetadataServiceUri)) {
+            bookkeeperMetadataServiceUri = 
PulsarService.bookieMetadataServiceUri(config);
+        }
+
+        URI uri = URI.create(bookkeeperMetadataServiceUri);
+        String path = ZkUtils.trimLedgersDefaultRootPath(uri.getPath());
+        String bookieZkConnect = StringUtils.replace(uri.getAuthority(), ";", 
",") + path;

Review comment:
       this line requires to use a common utility method as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to