This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a86e7be Fixed initial default namespace setup with current cluster setup (#1607) a86e7be is described below commit a86e7beb91619ccff716807324051530def33f2b Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Apr 18 23:43:34 2018 -0700 Fixed initial default namespace setup with current cluster setup (#1607) --- .../apache/pulsar/PulsarClusterMetadataSetup.java | 74 +++++++++++++--------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 37aff4b..5dfa80d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -45,6 +45,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,48 +154,63 @@ public class PulsarClusterMetadataSetup { // Create public tenant, whitelisted to use the this same cluster, along with other clusters String publicTenantPath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT; - TenantInfo publicTenant; - if (globalZk.exists(publicTenantPath, false) == null) { - publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(arguments.cluster)); + + Stat stat = globalZk.exists(publicTenantPath, false); + if (stat == null) { + TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(arguments.cluster)); + + try { + ZkUtils.createFullPathOptimistic(globalZk, publicTenantPath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } } else { + // Update existing public tenant with new cluster byte[] content = globalZk.getData(publicTenantPath, false, null); - publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class); - publicTenant.getAllowedClusters().add(arguments.cluster); - } - byte[] publicPropertyDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant); - try { - ZkUtils.createFullPathOptimistic( - globalZk, - publicTenantPath, - publicPropertyDataJson, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore + TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class); + + // Only update z-node if the list of clusters should be modified + if (!publicTenant.getAllowedClusters().contains(arguments.cluster)) { + publicTenant.getAllowedClusters().add(arguments.cluster); + + globalZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), + stat.getVersion()); + } } // Create default namespace String defaultNamespacePath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE; Policies policies; - if (globalZk.exists(defaultNamespacePath, false) == null) { + + stat = globalZk.exists(defaultNamespacePath, false); + if (stat == null) { policies = new Policies(); policies.bundles = getBundles(16); + policies.replication_clusters = Collections.singleton(arguments.cluster); + + try { + ZkUtils.createFullPathOptimistic( + globalZk, + defaultNamespacePath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } } else { byte[] content = globalZk.getData(defaultNamespacePath, false, null); policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class); - policies.replication_clusters.add(arguments.cluster); - } - byte[] defaultNamespaceDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies); - try { - ZkUtils.createFullPathOptimistic( - globalZk, - defaultNamespacePath, - defaultNamespaceDataJson, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore + // Only update z-node if the list of clusters should be modified + if (!policies.replication_clusters.contains(arguments.cluster)) { + policies.replication_clusters.add(arguments.cluster); + + globalZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + stat.getVersion()); + } } log.info("Cluster metadata for '{}' setup correctly", arguments.cluster); -- To stop receiving notification emails like this one, please contact si...@apache.org.