This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch bewaremypower/2.8-pick-16043 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a651b42dd6901d4a6ddb2d37ce220814e7881c09 Author: Yunze Xu <[email protected]> AuthorDate: Tue Jun 14 17:03:37 2022 +0800 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6) In addition to #16043, this PR fixes https://github.com/apache/pulsar/issues/16861 --- .../pulsar/broker/resources/BaseResources.java | 12 ++- .../broker/resources/NamespaceResources.java | 17 +++- .../pulsar/broker/resources/PulsarResources.java | 2 + .../pulsar/broker/resources/TopicResources.java | 53 ++++++++++++ .../pulsar/broker/service/BrokerService.java | 97 ++++++++++++++++------ .../pulsar/common/naming/SystemTopicNames.java | 70 ++++++++++++++++ 6 files changed, 225 insertions(+), 26 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 8016bcef314..5b195d9dcb1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; + +import com.google.common.base.Joiner; import lombok.Getter; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -38,6 +40,8 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; */ public class BaseResources<T> { + protected static final String BASE_POLICIES_PATH = "/admin/policies"; + @Getter private final MetadataStoreExtended store; @Getter @@ -164,4 +168,10 @@ public class BaseResources<T> { public CompletableFuture<Boolean> existsAsync(String path) { return cache.exists(path); } -} \ No newline at end of file + + protected static String joinPath(String... parts) { + StringBuilder sb = new StringBuilder(); + Joiner.on('/').appendTo(sb, parts); + return sb.toString(); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 58d493ee171..f4d876d2534 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -21,8 +21,12 @@ package org.apache.pulsar.broker.resources; import com.fasterxml.jackson.core.type.TypeReference; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; + import lombok.Getter; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.Policies; @@ -43,6 +47,10 @@ public class NamespaceResources extends BaseResources<Policies> { partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec); } + public CompletableFuture<Optional<Policies>> getPoliciesAsync(NamespaceName ns) { + return getCache().get(joinPath(BASE_POLICIES_PATH, ns.toString())); + } + public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> { public IsolationPolicyResources(MetadataStoreExtended store, int operationTimeoutSec) { super(store, new TypeReference<Map<String, NamespaceIsolationDataImpl>>() { @@ -56,8 +64,15 @@ public class NamespaceResources extends BaseResources<Policies> { } public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> { + private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics"; + public PartitionedTopicResources(MetadataStoreExtended configurationStore, int operationTimeoutSec) { super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec); } + + public CompletableFuture<Void> createPartitionedTopicAsync(TopicName tn, PartitionedTopicMetadata tm) { + return createAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName()), tm); + } } -} \ No newline at end of file +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index fa4853a22c5..5209a795acf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -38,6 +38,7 @@ public class PulsarResources { private DynamicConfigurationResources dynamicConfigResources; private LocalPoliciesResources localPolicies; private LoadManagerReportResources loadReportResources; + private TopicResources topicResources; private Optional<MetadataStoreExtended> localMetadataStore; private Optional<MetadataStoreExtended> configurationMetadataStore; @@ -56,6 +57,7 @@ public class PulsarResources { dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec); localPolicies = new LocalPoliciesResources(localMetadataStore, operationTimeoutSec); loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec); + topicResources = new TopicResources(localMetadataStore); } this.localMetadataStore = Optional.ofNullable(localMetadataStore); this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java new file mode 100644 index 00000000000..d25b308d086 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resources; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.MetadataStore; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.pulsar.common.util.Codec.decode; + +public class TopicResources { + private static final String MANAGED_LEDGER_PATH = "/managed-ledgers"; + + private final MetadataStore store; + + public TopicResources(MetadataStore store) { + this.store = store; + } + + public CompletableFuture<List<String>> getExistingPartitions(TopicName topic) { + return getExistingPartitions(topic.getNamespaceObject(), topic.getDomain()); + } + + public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, TopicDomain domain) { + String topicPartitionPath = MANAGED_LEDGER_PATH + "/" + ns + "/" + domain; + return store.getChildren(topicPartitionPath).thenApply(topics -> + topics.stream() + .map(s -> String.format("%s://%s/%s", domain.value(), ns, decode(s))) + .collect(Collectors.toList()) + ); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bdaded637b6..e91e462f7df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -141,6 +141,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -2412,16 +2413,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies .thenCompose(topicExists -> { return fetchPartitionedTopicMetadataAsync(topicName) .thenCompose(metadata -> { + CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>(); + // If topic is already exist, creating partitioned topic is not allowed. if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { - return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName); + + pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName) + .thenAccept(md -> future.complete(md)) + .exceptionally(ex -> { + if (ex.getCause() + instanceof MetadataStoreException.AlreadyExistsException) { + // The partitioned topic might be created concurrently + fetchPartitionedTopicMetadataAsync(topicName) + .whenComplete((metadata2, ex2) -> { + if (ex2 == null) { + future.complete(metadata2); + } else { + future.completeExceptionally(ex2); + } + }); + } else { + future.completeExceptionally(ex); + } + return null; + }); } else { - return CompletableFuture.completedFuture(metadata); + future.complete(metadata); } + + return future; }); }); } @@ -2436,28 +2460,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies "Number of partitions should be less than or equal to " + maxPartitions); PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); - CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline(); - - if (!checkMaxTopicsPerNamespace(topicName, defaultNumPartitions, partitionedTopicFuture)) { - return partitionedTopicFuture; - } - - try { - PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources() - .getPartitionedTopicResources(); - partitionResources.createAsync(partitionedTopicPath(topicName), configMetadata).thenAccept((r) -> { - log.info("partitioned metadata successfully created for {}", topicName); - partitionedTopicFuture.complete(configMetadata); - }).exceptionally(ex -> { - partitionedTopicFuture.completeExceptionally(ex.getCause()); - return null; - }); - } catch (Exception e) { - log.error("Failed to create default partitioned topic.", e); - return FutureUtil.failedFuture(e); - } - return partitionedTopicFuture; + return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions) + .thenCompose(__ -> { + PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources(); + return partitionResources.createPartitionedTopicAsync(topicName, configMetadata) + .thenApply(v -> { + log.info("partitioned metadata successfully created for {}", topicName); + return configMetadata; + }); + }); } public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) { @@ -2727,6 +2740,11 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return SystemTopicClient.isSystemTopic(TopicName.get(topic)); } + public boolean isSystemTopic(TopicName topicName) { + return NamespaceService.isSystemServiceNamespace(topicName.getNamespace()) + || SystemTopicNames.isSystemTopic(topicName); + } + /** * Get {@link TopicPolicies} for the parameterized topic. * @param topicName @@ -2744,8 +2762,39 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } } + private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { + return pulsar.getPulsarResources().getNamespaceResources() + .getPoliciesAsync(topicName.getNamespaceObject()) + .thenCompose(optPolicies -> { + int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace) + .orElse(pulsar.getConfig().getMaxTopicsPerNamespace()); + + if (maxTopicsPerNamespace > 0 && !isSystemTopic(topicName)) { + return pulsar().getPulsarResources().getTopicResources() + .getExistingPartitions(topicName) + .thenCompose(topics -> { + // exclude created system topic + long topicsCount = topics.stream() + .filter(t -> !isSystemTopic(TopicName.get(t))) + .count(); + if (topicsCount + numPartitions > maxTopicsPerNamespace) { + log.error("Failed to create persistent topic {}, " + + "exceed maximum number of topics in namespace", topicName); + return FutureUtil.failedFuture( + new RestException(Response.Status.PRECONDITION_FAILED, + "Exceed maximum number of topics in namespace.")); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions, - CompletableFuture<T> topicFuture) { + CompletableFuture<T> topicFuture) { Integer maxTopicsPerNamespace; try { maxTopicsPerNamespace = pulsar.getConfigurationCache().policiesCache() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java new file mode 100644 index 00000000000..72ec16752c6 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.naming; + +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.Set; + +/** + * Encapsulate the parsing of the completeTopicName name. + */ +public class SystemTopicNames { + + /** + * Local topic name for the namespace events. + */ + public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events"; + + /** + * Local topic name for the transaction buffer snapshot. + */ + public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot"; + + + public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + + /** + * The set of all local topic names declared above. + */ + public static final Set<String> EVENTS_TOPIC_NAMES = + Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT)); + + public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + + public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + + public static boolean isEventSystemTopic(TopicName topicName) { + return EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()); + } + + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(PENDING_ACK_STORE_SUFFIX); + } + + public static boolean isSystemTopic(TopicName topicName) { + TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return isEventSystemTopic(nonePartitionedTopicName) || isTransactionInternalName(nonePartitionedTopicName); + } +}
