This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1572f9193f9 [fix][broker] Fix stuck when enable topic level
replication and build remote admin fails (#23028)
1572f9193f9 is described below
commit 1572f9193f9ccf04193ea2a0dee3a3a751090e13
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 15 10:04:48 2024 +0800
[fix][broker] Fix stuck when enable topic level replication and build
remote admin fails (#23028)
(cherry picked from commit 88ebe785dbdab239104981453a9bd0e4a7e896d3)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++--
.../broker/service/OneWayReplicatorTest.java | 35 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 514709b58a9..38e8bdeaaac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -630,7 +631,7 @@ public abstract class AdminResource extends
PulsarWebResource {
});
}
- protected Map<String, CompletableFuture<Void>>
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+ protected Map<String, CompletableFuture<Void>>
internalCreatePartitionedTopicToReplicatedClustersInBackground (
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new
HashMap<>();
@@ -649,9 +650,17 @@ public abstract class AdminResource extends
PulsarWebResource {
createRemoteTopicFuture.completeExceptionally(new
RestException(ex1));
return;
}
+ PulsarAdmin remotePulsarAdmin;
+ try {
+ remotePulsarAdmin =
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
+ } catch (Exception ex) {
+ log.error("[{}] [{}] An un-expected error occurs when
trying to create remote pulsar admin for"
+ + " cluster {}", clientAppId(), topicName,
cluster, ex);
+ createRemoteTopicFuture.completeExceptionally(new
RestException(ex));
+ return;
+ }
// Get cluster data success.
- TopicsImpl topics =
- (TopicsImpl)
pulsar().getBrokerService().getClusterPulsarAdmin(cluster,
clusterData).topics();
+ TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
topics.createPartitionedTopicAsync(shortTopicName,
numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index a14b2abcde6..393277be307 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -34,7 +34,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -53,6 +55,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -70,6 +73,8 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -900,4 +905,34 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
}
+
+ @Test(timeOut = 30 * 1000)
+ public void testCreateRemoteAdminFailed() throws Exception {
+ final TenantInfo tenantInfo =
admin1.tenants().getTenantInfo(defaultTenant);
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ final String randomClusterName = "c_" +
UUID.randomUUID().toString().replace("-", "");
+ final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
+ admin1.namespaces().createNamespace(ns1);
+ admin1.topics().createPartitionedTopic(topic, 2);
+
+ // Inject a wrong cluster data which with empty fields.
+ ClusterResources clusterResources =
broker1.getPulsar().getPulsarResources().getClusterResources();
+ clusterResources.createCluster(randomClusterName,
ClusterData.builder().build());
+ Set<String> allowedClusters = new
HashSet<>(tenantInfo.getAllowedClusters());
+ allowedClusters.add(randomClusterName);
+ admin1.tenants().updateTenant(defaultTenant,
TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
+ .allowedClusters(allowedClusters).build());
+
+ // Verify.
+ try {
+ admin1.topics().setReplicationClusters(topic,
Arrays.asList(cluster1, randomClusterName));
+ fail("Expected a error due to empty fields");
+ } catch (Exception ex) {
+ // Expected an error.
+ }
+
+ // cleanup.
+ admin1.topics().deletePartitionedTopic(topic);
+ admin1.tenants().updateTenant(defaultTenant, tenantInfo);
+ }
}