This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a813c52931a8078a959b06cdba750419bc71dad5 Author: Rajan Dhabalia <[email protected]> AuthorDate: Thu Nov 11 07:32:35 2021 -0800 [pulsar-broker] Handle lookup redirect for V1-topics with different cluster (#12743) (cherry picked from commit 25cbfadbf0a1aba8c149c978063902500ab08379) --- .../authorization/PulsarAuthorizationProvider.java | 23 ++++++++++++---------- .../pulsar/broker/resources/ClusterResources.java | 4 ++++ .../pulsar/broker/service/ReplicatorTest.java | 21 ++++++++++++++++++++ 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index b6b1bafd..b50d7de 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -364,20 +364,23 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { } private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, String role, AuthAction action) { - return checkPermission(topicName, role, action) - .thenApply(isPermission -> isPermission && checkCluster(topicName)); + return checkPermission(topicName, role, action). + thenApply(isPermission -> isPermission). + thenCompose(permission -> permission ? checkCluster(topicName) : + CompletableFuture.completedFuture(false)); } - private boolean checkCluster(TopicName topicName) { + private CompletableFuture<Boolean> checkCluster(TopicName topicName) { if (topicName.isGlobal() || conf.getClusterName().equals(topicName.getCluster())) { - return true; - } else { - if (log.isDebugEnabled()) { - log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), - conf.getClusterName()); - } - return false; + return CompletableFuture.completedFuture(true); } + if (log.isDebugEnabled()) { + log.debug("Topic [{}] does not belong to local cluster [{}]", topicName.toString(), conf.getClusterName()); + } + return pulsarResources.getClusterResources().listAsync() + .thenApply(clusters -> { + return clusters.contains(topicName.getCluster()); + }); } public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 1a3cf89..a4ee27a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -42,6 +42,10 @@ public class ClusterResources extends BaseResources<ClusterData> { this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec); } + public CompletableFuture<Set<String>> listAsync() { + return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new HashSet<>(list)); + } + public Set<String> list() throws MetadataStoreException { return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH)); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 04f96ec..87e2730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1310,6 +1310,27 @@ public class ReplicatorTest extends ReplicatorTestBase { }); } + @Test + public void testLookupAnotherCluster() throws Exception { + log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---"); + + String namespace = "pulsar/r2/cross-cluster-ns"; + admin1.namespaces().createNamespace(namespace); + final TopicName topicName = TopicName + .get("persistent://" + namespace + "/topic"); + + @Cleanup + PulsarClient client1 = PulsarClient.builder() + .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + producer.close(); + } + private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) { // wait non-partitioned topics replicators created finished final List<String> list = new ArrayList<>();
