This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7604d4aef67 [fix][broker] Replace sync call in async call chain of
AdminResource#internalCreatePartitionedTopic (#19399)
7604d4aef67 is described below
commit 7604d4aef670e31177d803e9f3c4a912be94267f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 15:03:56 2023 +0200
[fix][broker] Replace sync call in async call chain of
AdminResource#internalCreatePartitionedTopic (#19399)
---
.../apache/pulsar/broker/admin/AdminResource.java | 71 +++++++++++-----------
1 file changed, 35 insertions(+), 36 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5a8eaab8e6b..dd3ea8535b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -503,23 +503,6 @@ public abstract class AdminResource extends
PulsarWebResource {
return getNamespacePolicies(ns);
}
- protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
- return getNamespaceReplicatedClusters(namespaceName).size() > 1;
- }
-
- protected Set<String> getNamespaceReplicatedClusters(NamespaceName
namespaceName) {
- try {
- final Policies policies =
namespaceResources().getPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
- return policies.replication_clusters;
- } catch (RestException re) {
- throw re;
- } catch (Exception e) {
- log.error("[{}] Failed to get namespace policies {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
- }
-
protected CompletableFuture<Set<String>>
getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
@@ -616,26 +599,9 @@ public abstract class AdminResource extends
PulsarWebResource {
.thenCompose(__ ->
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
- List<String> replicatedClusters = new ArrayList<>();
- if (!createLocalTopicOnly && topicName.isGlobal() &&
isNamespaceReplicated(namespaceName)) {
- getNamespaceReplicatedClusters(namespaceName)
- .stream()
- .filter(cluster ->
!cluster.equals(pulsar().getConfiguration().getClusterName()))
- .forEach(replicatedClusters::add);
+ if (!createLocalTopicOnly && topicName.isGlobal()) {
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
}
- replicatedClusters.forEach(cluster -> {
-
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
- .thenAccept(clusterDataOp ->
- ((TopicsImpl)
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
- .createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(), numPartitions,
- true, null))
- .exceptionally(ex -> {
- log.error("Failed to create partition
topic in cluster {}.", cluster, ex);
- return null;
- });
- });
log.info("[{}] Successfully created partitions for topic
{} in cluster {}",
clientAppId(), topicName,
pulsar().getConfiguration().getClusterName());
asyncResponse.resume(Response.noContent().build());
@@ -647,6 +613,39 @@ public abstract class AdminResource extends
PulsarWebResource {
});
}
+ private void
internalCreatePartitionedTopicToReplicatedClustersInBackground(int
numPartitions) {
+ getNamespaceReplicatedClustersAsync(namespaceName)
+ .thenAccept(clusters -> {
+ for (String cluster : clusters) {
+ if
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
+ // this call happens in the background without
async composition. completion is logged.
+ pulsar().getPulsarResources().getClusterResources()
+ .getClusterAsync(cluster)
+ .thenCompose(clusterDataOp ->
+ ((TopicsImpl)
pulsar().getBrokerService()
+
.getClusterPulsarAdmin(cluster,
+
clusterDataOp).topics())
+
.createPartitionedTopicAsync(
+
topicName.getPartitionedTopicName(),
+ numPartitions,
+ true, null))
+ .whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error(
+ "[{}] Failed to create
partitioned topic {} in cluster {}.",
+ clientAppId(), topicName,
cluster, ex);
+ } else {
+ log.info(
+ "[{}] Successfully created
partitioned topic {} in "
+ + "cluster {}",
+ clientAppId(), topicName,
cluster);
+ }
+ });
+ }
+ }
+ });
+ }
+
/**
* Check the exists topics contains the given topic.
* Since there are topic partitions and non-partitioned topics in Pulsar,
must ensure both partitions