This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 88ebe785dbd [fix][broker] Fix stuck when enable topic level
replication and build remote admin fails (#23028)
88ebe785dbd is described below
commit 88ebe785dbdab239104981453a9bd0e4a7e896d3
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 15 10:04:48 2024 +0800
[fix][broker] Fix stuck when enable topic level replication and build
remote admin fails (#23028)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++--
.../broker/service/OneWayReplicatorTest.java | 35 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 45455f16d4d..1f43aeaa668 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -630,7 +631,7 @@ public abstract class AdminResource extends
PulsarWebResource {
});
}
- protected Map<String, CompletableFuture<Void>>
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+ protected Map<String, CompletableFuture<Void>>
internalCreatePartitionedTopicToReplicatedClustersInBackground (
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new
HashMap<>();
@@ -649,9 +650,17 @@ public abstract class AdminResource extends
PulsarWebResource {
createRemoteTopicFuture.completeExceptionally(new
RestException(ex1));
return;
}
+ PulsarAdmin remotePulsarAdmin;
+ try {
+ remotePulsarAdmin =
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
+ } catch (Exception ex) {
+ log.error("[{}] [{}] An un-expected error occurs when
trying to create remote pulsar admin for"
+ + " cluster {}", clientAppId(), topicName,
cluster, ex);
+ createRemoteTopicFuture.completeExceptionally(new
RestException(ex));
+ return;
+ }
// Get cluster data success.
- TopicsImpl topics =
- (TopicsImpl)
pulsar().getBrokerService().getClusterPulsarAdmin(cluster,
clusterData).topics();
+ TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
topics.createPartitionedTopicAsync(shortTopicName,
numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 80091c9e5eb..9aad26530df 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -38,8 +38,10 @@ import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -58,6 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -75,7 +78,9 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -947,6 +952,36 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1,
cluster2));
}
+ @Test(timeOut = 30 * 1000)
+ public void testCreateRemoteAdminFailed() throws Exception {
+ final TenantInfo tenantInfo =
admin1.tenants().getTenantInfo(defaultTenant);
+ final String ns1 = defaultTenant + "/ns_" +
UUID.randomUUID().toString().replace("-", "");
+ final String randomClusterName = "c_" +
UUID.randomUUID().toString().replace("-", "");
+ final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
+ admin1.namespaces().createNamespace(ns1);
+ admin1.topics().createPartitionedTopic(topic, 2);
+
+ // Inject a wrong cluster data which with empty fields.
+ ClusterResources clusterResources =
broker1.getPulsar().getPulsarResources().getClusterResources();
+ clusterResources.createCluster(randomClusterName,
ClusterData.builder().build());
+ Set<String> allowedClusters = new
HashSet<>(tenantInfo.getAllowedClusters());
+ allowedClusters.add(randomClusterName);
+ admin1.tenants().updateTenant(defaultTenant,
TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
+ .allowedClusters(allowedClusters).build());
+
+ // Verify.
+ try {
+ admin1.topics().setReplicationClusters(topic,
Arrays.asList(cluster1, randomClusterName));
+ fail("Expected a error due to empty fields");
+ } catch (Exception ex) {
+ // Expected an error.
+ }
+
+ // cleanup.
+ admin1.topics().deletePartitionedTopic(topic);
+ admin1.tenants().updateTenant(defaultTenant, tenantInfo);
+ }
+
@Test
public void testConfigReplicationStartAt() throws Exception {
// Initialize.