This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch bewaremypower/2.8-pick-16043
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a651b42dd6901d4a6ddb2d37ce220814e7881c09
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created 
concurrently (#16043)
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
    
    In addition to #16043, this PR fixes 
https://github.com/apache/pulsar/issues/16861
---
 .../pulsar/broker/resources/BaseResources.java     | 12 ++-
 .../broker/resources/NamespaceResources.java       | 17 +++-
 .../pulsar/broker/resources/PulsarResources.java   |  2 +
 .../pulsar/broker/resources/TopicResources.java    | 53 ++++++++++++
 .../pulsar/broker/service/BrokerService.java       | 97 ++++++++++++++++------
 .../pulsar/common/naming/SystemTopicNames.java     | 70 ++++++++++++++++
 6 files changed, 225 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 8016bcef314..5b195d9dcb1 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+
+import com.google.common.base.Joiner;
 import lombok.Getter;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -38,6 +40,8 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
  */
 public class BaseResources<T> {
 
+    protected static final String BASE_POLICIES_PATH = "/admin/policies";
+
     @Getter
     private final MetadataStoreExtended store;
     @Getter
@@ -164,4 +168,10 @@ public class BaseResources<T> {
     public CompletableFuture<Boolean> existsAsync(String path) {
         return cache.exists(path);
     }
-}
\ No newline at end of file
+
+    protected static String joinPath(String... parts) {
+        StringBuilder sb = new StringBuilder();
+        Joiner.on('/').appendTo(sb, parts);
+        return sb.toString();
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 58d493ee171..f4d876d2534 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -21,8 +21,12 @@ package org.apache.pulsar.broker.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
 import lombok.Getter;
 
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -43,6 +47,10 @@ public class NamespaceResources extends 
BaseResources<Policies> {
         partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
     }
 
+    public CompletableFuture<Optional<Policies>> 
getPoliciesAsync(NamespaceName ns) {
+        return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString()));
+    }
+
     public static class IsolationPolicyResources extends 
BaseResources<Map<String, NamespaceIsolationDataImpl>> {
         public IsolationPolicyResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
             super(store, new TypeReference<Map<String, 
NamespaceIsolationDataImpl>>() {
@@ -56,8 +64,15 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     }
 
     public static class PartitionedTopicResources extends 
BaseResources<PartitionedTopicMetadata> {
+        private static final String PARTITIONED_TOPIC_PATH = 
"/admin/partitioned-topics";
+
         public PartitionedTopicResources(MetadataStoreExtended 
configurationStore, int operationTimeoutSec) {
             super(configurationStore, PartitionedTopicMetadata.class, 
operationTimeoutSec);
         }
+
+        public CompletableFuture<Void> createPartitionedTopicAsync(TopicName 
tn, PartitionedTopicMetadata tm) {
+            return createAsync(joinPath(PARTITIONED_TOPIC_PATH, 
tn.getNamespace(), tn.getDomain().value(),
+                    tn.getEncodedLocalName()), tm);
+        }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fa4853a22c5..5209a795acf 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -38,6 +38,7 @@ public class PulsarResources {
     private DynamicConfigurationResources dynamicConfigResources;
     private LocalPoliciesResources localPolicies;
     private LoadManagerReportResources loadReportResources;
+    private TopicResources topicResources;
     private Optional<MetadataStoreExtended> localMetadataStore;
     private Optional<MetadataStoreExtended> configurationMetadataStore;
 
@@ -56,6 +57,7 @@ public class PulsarResources {
             dynamicConfigResources = new 
DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
             localPolicies = new LocalPoliciesResources(localMetadataStore, 
operationTimeoutSec);
             loadReportResources = new 
LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
+            topicResources = new TopicResources(localMetadataStore);
         }
         this.localMetadataStore = Optional.ofNullable(localMetadataStore);
         this.configurationMetadataStore = 
Optional.ofNullable(configurationMetadataStore);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
new file mode 100644
index 00000000000..d25b308d086
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.pulsar.common.util.Codec.decode;
+
+public class TopicResources {
+    private static final String MANAGED_LEDGER_PATH = "/managed-ledgers";
+
+    private final MetadataStore store;
+
+    public TopicResources(MetadataStore store) {
+        this.store = store;
+    }
+
+    public CompletableFuture<List<String>> getExistingPartitions(TopicName 
topic) {
+        return getExistingPartitions(topic.getNamespaceObject(), 
topic.getDomain());
+    }
+
+    public CompletableFuture<List<String>> getExistingPartitions(NamespaceName 
ns, TopicDomain domain) {
+        String topicPartitionPath = MANAGED_LEDGER_PATH + "/" + ns + "/" + 
domain;
+        return store.getChildren(topicPartitionPath).thenApply(topics ->
+                topics.stream()
+                        .map(s -> String.format("%s://%s/%s", domain.value(), 
ns, decode(s)))
+                        .collect(Collectors.toList())
+        );
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..e91e462f7df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -141,6 +141,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -2412,16 +2413,39 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
+                                CompletableFuture<PartitionedTopicMetadata> 
future = new CompletableFuture<>();
+
                                 // If topic is already exist, creating 
partitioned topic is not allowed.
                                 if (metadata.partitions == 0
                                         && !topicExists
                                         && !topicName.isPartitioned()
                                         && 
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
                                         && 
pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-                                    return 
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+
+                                    
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+                                            .thenAccept(md -> 
future.complete(md))
+                                            .exceptionally(ex -> {
+                                                if (ex.getCause()
+                                                        instanceof 
MetadataStoreException.AlreadyExistsException) {
+                                                    // The partitioned topic 
might be created concurrently
+                                                    
fetchPartitionedTopicMetadataAsync(topicName)
+                                                            
.whenComplete((metadata2, ex2) -> {
+                                                                if (ex2 == 
null) {
+                                                                    
future.complete(metadata2);
+                                                                } else {
+                                                                    
future.completeExceptionally(ex2);
+                                                                }
+                                                            });
+                                                } else {
+                                                    
future.completeExceptionally(ex);
+                                                }
+                                                return null;
+                                            });
                                 } else {
-                                    return 
CompletableFuture.completedFuture(metadata);
+                                    future.complete(metadata);
                                 }
+
+                                return future;
                             });
                 });
     }
@@ -2436,28 +2460,17 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 "Number of partitions should be less than or equal to " + 
maxPartitions);
 
         PartitionedTopicMetadata configMetadata = new 
PartitionedTopicMetadata(defaultNumPartitions);
-        CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = 
futureWithDeadline();
-
-        if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, 
partitionedTopicFuture)) {
-            return partitionedTopicFuture;
-        }
-
-        try {
-            PartitionedTopicResources partitionResources = 
pulsar.getPulsarResources().getNamespaceResources()
-                    .getPartitionedTopicResources();
-            partitionResources.createAsync(partitionedTopicPath(topicName), 
configMetadata).thenAccept((r) -> {
-                log.info("partitioned metadata successfully created for {}", 
topicName);
-                partitionedTopicFuture.complete(configMetadata);
-            }).exceptionally(ex -> {
-                partitionedTopicFuture.completeExceptionally(ex.getCause());
-                return null;
-            });
-        } catch (Exception e) {
-            log.error("Failed to create default partitioned topic.", e);
-            return FutureUtil.failedFuture(e);
-        }
 
-        return partitionedTopicFuture;
+        return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions)
+                .thenCompose(__ -> {
+                    PartitionedTopicResources partitionResources = 
pulsar.getPulsarResources().getNamespaceResources()
+                            .getPartitionedTopicResources();
+                    return 
partitionResources.createPartitionedTopicAsync(topicName, configMetadata)
+                            .thenApply(v -> {
+                                log.info("partitioned metadata successfully 
created for {}", topicName);
+                                return configMetadata;
+                            });
+                });
     }
 
     public CompletableFuture<PartitionedTopicMetadata> 
fetchPartitionedTopicMetadataAsync(TopicName topicName) {
@@ -2727,6 +2740,11 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         return SystemTopicClient.isSystemTopic(TopicName.get(topic));
     }
 
+    public boolean isSystemTopic(TopicName topicName) {
+        return 
NamespaceService.isSystemServiceNamespace(topicName.getNamespace())
+                || SystemTopicNames.isSystemTopic(topicName);
+    }
+
     /**
      * Get {@link TopicPolicies} for the parameterized topic.
      * @param topicName
@@ -2744,8 +2762,39 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
     }
 
+    private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
+        return pulsar.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(optPolicies -> {
+                    int maxTopicsPerNamespace = optPolicies.map(p -> 
p.max_topics_per_namespace)
+                            
.orElse(pulsar.getConfig().getMaxTopicsPerNamespace());
+
+                    if (maxTopicsPerNamespace > 0 && 
!isSystemTopic(topicName)) {
+                        return 
pulsar().getPulsarResources().getTopicResources()
+                                .getExistingPartitions(topicName)
+                                .thenCompose(topics -> {
+                                    // exclude created system topic
+                                    long topicsCount = topics.stream()
+                                            .filter(t -> 
!isSystemTopic(TopicName.get(t)))
+                                            .count();
+                                    if (topicsCount + numPartitions > 
maxTopicsPerNamespace) {
+                                        log.error("Failed to create persistent 
topic {}, "
+                                                + "exceed maximum number of 
topics in namespace", topicName);
+                                        return FutureUtil.failedFuture(
+                                                new 
RestException(Response.Status.PRECONDITION_FAILED,
+                                                        "Exceed maximum number 
of topics in namespace."));
+                                    } else {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                });
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
+    }
+
     private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int 
numPartitions,
-                                            CompletableFuture<T> topicFuture) {
+                                                   CompletableFuture<T> 
topicFuture) {
         Integer maxTopicsPerNamespace;
         try {
             maxTopicsPerNamespace = 
pulsar.getConfigurationCache().policiesCache()
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
new file mode 100644
index 00000000000..72ec16752c6
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Encapsulate the parsing of the completeTopicName name.
+ */
+public class SystemTopicNames {
+
+    /**
+     * Local topic name for the namespace events.
+     */
+    public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";
+
+    /**
+     * Local topic name for the transaction buffer snapshot.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT = 
"__transaction_buffer_snapshot";
+
+
+    public static final String PENDING_ACK_STORE_SUFFIX = 
"__transaction_pending_ack";
+
+    /**
+     * The set of all local topic names declared above.
+     */
+    public static final Set<String> EVENTS_TOPIC_NAMES =
+            
Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, 
TRANSACTION_BUFFER_SNAPSHOT));
+
+    public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = 
TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");
+
+    public static final TopicName TRANSACTION_COORDINATOR_LOG = 
TopicName.get(TopicDomain.persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
+
+    public static boolean isEventSystemTopic(TopicName topicName) {
+        return 
EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
+    }
+
+    public static boolean isTransactionInternalName(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString())
+                || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
+                || topic.endsWith(PENDING_ACK_STORE_SUFFIX);
+    }
+
+    public static boolean isSystemTopic(TopicName topicName) {
+        TopicName nonePartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        return isEventSystemTopic(nonePartitionedTopicName) || 
isTransactionInternalName(nonePartitionedTopicName);
+    }
+}

Reply via email to