This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a86e7be  Fixed initial default namespace setup with current cluster 
setup (#1607)
a86e7be is described below

commit a86e7beb91619ccff716807324051530def33f2b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Apr 18 23:43:34 2018 -0700

    Fixed initial default namespace setup with current cluster setup (#1607)
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 74 +++++++++++++---------
 1 file changed, 45 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 37aff4b..5dfa80d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -45,6 +45,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,48 +154,63 @@ public class PulsarClusterMetadataSetup {
 
         // Create public tenant, whitelisted to use the this same cluster, 
along with other clusters
         String publicTenantPath = POLICIES_ROOT + "/" + 
TopicName.PUBLIC_TENANT;
-        TenantInfo publicTenant;
-        if (globalZk.exists(publicTenantPath, false) == null) {
-            publicTenant = new TenantInfo(Collections.emptySet(), 
Collections.singleton(arguments.cluster));
+
+        Stat stat = globalZk.exists(publicTenantPath, false);
+        if (stat == null) {
+            TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), 
Collections.singleton(arguments.cluster));
+
+            try {
+                ZkUtils.createFullPathOptimistic(globalZk, publicTenantPath,
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException e) {
+                // Ignore
+            }
         } else {
+            // Update existing public tenant with new cluster
             byte[] content = globalZk.getData(publicTenantPath, false, null);
-            publicTenant = 
ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
-            publicTenant.getAllowedClusters().add(arguments.cluster);
-        }
-        byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant);
-        try {
-            ZkUtils.createFullPathOptimistic(
-                globalZk,
-                publicTenantPath,
-                publicPropertyDataJson,
-                ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-        } catch (NodeExistsException e) {
-            // Ignore
+            TenantInfo publicTenant = 
ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
+
+            // Only update z-node if the list of clusters should be modified
+            if 
(!publicTenant.getAllowedClusters().contains(arguments.cluster)) {
+                publicTenant.getAllowedClusters().add(arguments.cluster);
+
+                globalZk.setData(publicTenantPath, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
+                        stat.getVersion());
+            }
         }
 
         // Create default namespace
         String defaultNamespacePath = POLICIES_ROOT + "/" + 
TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE;
         Policies policies;
-        if (globalZk.exists(defaultNamespacePath, false) == null) {
+
+        stat = globalZk.exists(defaultNamespacePath, false);
+        if (stat == null) {
             policies = new Policies();
             policies.bundles = getBundles(16);
+            policies.replication_clusters = 
Collections.singleton(arguments.cluster);
+
+            try {
+                ZkUtils.createFullPathOptimistic(
+                    globalZk,
+                    defaultNamespacePath,
+                    
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            } catch (NodeExistsException e) {
+                // Ignore
+            }
         } else {
             byte[] content = globalZk.getData(defaultNamespacePath, false, 
null);
             policies = ObjectMapperFactory.getThreadLocal().readValue(content, 
Policies.class);
-            policies.replication_clusters.add(arguments.cluster);
-        }
 
-        byte[] defaultNamespaceDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
-        try {
-            ZkUtils.createFullPathOptimistic(
-                globalZk,
-                defaultNamespacePath,
-                defaultNamespaceDataJson,
-                ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-        } catch (NodeExistsException e) {
-            // Ignore
+            // Only update z-node if the list of clusters should be modified
+            if (!policies.replication_clusters.contains(arguments.cluster)) {
+                policies.replication_clusters.add(arguments.cluster);
+
+                globalZk.setData(defaultNamespacePath, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                        stat.getVersion());
+            }
         }
 
         log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to