This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 16da5f78e113778a80bcb26ec80d7e6eb0fcce58 Author: fengyubiao <[email protected]> AuthorDate: Mon Jul 15 10:04:48 2024 +0800 [fix][broker] Fix stuck when enable topic level replication and build remote admin fails (#23028) (cherry picked from commit 88ebe785dbdab239104981453a9bd0e4a7e896d3) --- .../apache/pulsar/broker/admin/AdminResource.java | 15 +++++++-- .../broker/service/OneWayReplicatorTest.java | 39 ++++++++++++++++++++-- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f6be219aba3..7121a627d83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -622,7 +623,7 @@ public abstract class AdminResource extends PulsarWebResource { }); } - protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground( + protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground ( Set<String> clusters, int numPartitions) { final String shortTopicName = topicName.getPartitionedTopicName(); Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>(); @@ -641,9 +642,17 @@ public abstract class AdminResource extends PulsarWebResource { createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); return; } + PulsarAdmin remotePulsarAdmin; + try { + remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData); + } catch (Exception ex) { + log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for" + + " cluster {}", clientAppId(), topicName, cluster, ex); + createRemoteTopicFuture.completeExceptionally(new RestException(ex)); + return; + } // Get cluster data success. - TopicsImpl topics = - (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics(); topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) .whenComplete((ignore, ex2) -> { if (ex2 == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f0f699a7e16..fef9c00ebff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -36,7 +36,9 @@ import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -55,6 +57,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -70,11 +73,13 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -915,6 +920,36 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); } + @Test(timeOut = 30 * 1000) + public void testCreateRemoteAdminFailed() throws Exception { + final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant); + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", ""); + final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp"); + admin1.namespaces().createNamespace(ns1); + admin1.topics().createPartitionedTopic(topic, 2); + + // Inject a wrong cluster data which with empty fields. + ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources(); + clusterResources.createCluster(randomClusterName, ClusterData.builder().build()); + Set<String> allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters()); + allowedClusters.add(randomClusterName); + admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles()) + .allowedClusters(allowedClusters).build()); + + // Verify. + try { + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName)); + fail("Expected a error due to empty fields"); + } catch (Exception ex) { + // Expected an error. + } + + // cleanup. + admin1.topics().deletePartitionedTopic(topic); + admin1.tenants().updateTenant(defaultTenant, tenantInfo); + } + @Test public void testConfigReplicationStartAt() throws Exception { // Initialize.
