This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 16da5f78e113778a80bcb26ec80d7e6eb0fcce58
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 15 10:04:48 2024 +0800

    [fix][broker] Fix stuck when enable topic level replication and build 
remote admin fails (#23028)
    
    (cherry picked from commit 88ebe785dbdab239104981453a9bd0e4a7e896d3)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 15 +++++++--
 .../broker/service/OneWayReplicatorTest.java       | 39 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f6be219aba3..7121a627d83 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -622,7 +623,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
             });
     }
 
-    protected Map<String, CompletableFuture<Void>> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+    protected Map<String, CompletableFuture<Void>> 
internalCreatePartitionedTopicToReplicatedClustersInBackground (
             Set<String> clusters, int numPartitions) {
         final String shortTopicName = topicName.getPartitionedTopicName();
         Map<String, CompletableFuture<Void>> tasksForAllClusters = new 
HashMap<>();
@@ -641,9 +642,17 @@ public abstract class AdminResource extends 
PulsarWebResource {
                     createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
                     return;
                 }
+                PulsarAdmin remotePulsarAdmin;
+                try {
+                    remotePulsarAdmin = 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
+                } catch (Exception ex) {
+                    log.error("[{}] [{}] An un-expected error occurs when 
trying to create remote pulsar admin for"
+                            + " cluster {}", clientAppId(), topicName, 
cluster, ex);
+                    createRemoteTopicFuture.completeExceptionally(new 
RestException(ex));
+                    return;
+                }
                 // Get cluster data success.
-                TopicsImpl topics =
-                        (TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+                TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
                 topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
                         .whenComplete((ignore, ex2) -> {
                     if (ex2 == null) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index f0f699a7e16..fef9c00ebff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -36,7 +36,9 @@ import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -55,6 +57,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -70,11 +73,13 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -915,6 +920,36 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, 
cluster2));
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testCreateRemoteAdminFailed() throws Exception {
+        final TenantInfo tenantInfo = 
admin1.tenants().getTenantInfo(defaultTenant);
+        final String ns1 = defaultTenant + "/ns_" + 
UUID.randomUUID().toString().replace("-", "");
+        final String randomClusterName = "c_" + 
UUID.randomUUID().toString().replace("-", "");
+        final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
+        admin1.namespaces().createNamespace(ns1);
+        admin1.topics().createPartitionedTopic(topic, 2);
+
+        // Inject a wrong cluster data which with empty fields.
+        ClusterResources clusterResources = 
broker1.getPulsar().getPulsarResources().getClusterResources();
+        clusterResources.createCluster(randomClusterName, 
ClusterData.builder().build());
+        Set<String> allowedClusters = new 
HashSet<>(tenantInfo.getAllowedClusters());
+        allowedClusters.add(randomClusterName);
+        admin1.tenants().updateTenant(defaultTenant, 
TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
+                .allowedClusters(allowedClusters).build());
+
+        // Verify.
+        try {
+            admin1.topics().setReplicationClusters(topic, 
Arrays.asList(cluster1, randomClusterName));
+            fail("Expected a error due to empty fields");
+        } catch (Exception ex) {
+            // Expected an error.
+        }
+
+        // cleanup.
+        admin1.topics().deletePartitionedTopic(topic);
+        admin1.tenants().updateTenant(defaultTenant, tenantInfo);
+    }
+
     @Test
     public void testConfigReplicationStartAt() throws Exception {
         // Initialize.

Reply via email to