This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 25cbfad [pulsar-broker] Handle lookup redirect for V1-topics with
different cluster (#12743)
25cbfad is described below
commit 25cbfadbf0a1aba8c149c978063902500ab08379
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Nov 11 07:32:35 2021 -0800
[pulsar-broker] Handle lookup redirect for V1-topics with different cluster
(#12743)
---
.../authorization/PulsarAuthorizationProvider.java | 23 ++++++++++++----------
.../pulsar/broker/resources/ClusterResources.java | 4 ++++
.../pulsar/broker/service/ReplicatorTest.java | 21 ++++++++++++++++++++
3 files changed, 38 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 5f37eb5..f428a47 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -367,20 +367,23 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
}
private CompletableFuture<Boolean> checkAuthorization(TopicName topicName,
String role, AuthAction action) {
- return checkPermission(topicName, role, action)
- .thenApply(isPermission -> isPermission &&
checkCluster(topicName));
+ return checkPermission(topicName, role, action).
+ thenApply(isPermission -> isPermission).
+ thenCompose(permission -> permission ? checkCluster(topicName)
:
+ CompletableFuture.completedFuture(false));
}
- private boolean checkCluster(TopicName topicName) {
+ private CompletableFuture<Boolean> checkCluster(TopicName topicName) {
if (topicName.isGlobal() ||
conf.getClusterName().equals(topicName.getCluster())) {
- return true;
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Topic [{}] does not belong to local cluster [{}]",
topicName.toString(),
- conf.getClusterName());
- }
- return false;
+ return CompletableFuture.completedFuture(true);
}
+ if (log.isDebugEnabled()) {
+ log.debug("Topic [{}] does not belong to local cluster [{}]",
topicName.toString(), conf.getClusterName());
+ }
+ return pulsarResources.getClusterResources().listAsync()
+ .thenApply(clusters -> {
+ return clusters.contains(topicName.getCluster());
+ });
}
public CompletableFuture<Boolean> checkPermission(TopicName topicName,
String role, AuthAction action) {
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 1a3cf89..a4ee27a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -42,6 +42,10 @@ public class ClusterResources extends
BaseResources<ClusterData> {
this.failureDomainResources = new FailureDomainResources(store,
FailureDomainImpl.class, operationTimeoutSec);
}
+ public CompletableFuture<Set<String>> listAsync() {
+ return getChildrenAsync(BASE_CLUSTERS_PATH).thenApply(list -> new
HashSet<>(list));
+ }
+
public Set<String> list() throws MetadataStoreException {
return new HashSet<>(super.getChildren(BASE_CLUSTERS_PATH));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 11c8e19..36197ce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1263,6 +1263,27 @@ public class ReplicatorTest extends ReplicatorTestBase {
});
}
+ @Test
+ public void testLookupAnotherCluster() throws Exception {
+ log.info("--- Starting ReplicatorTest::testLookupAnotherCluster ---");
+
+ String namespace = "pulsar/r2/cross-cluster-ns";
+ admin1.namespaces().createNamespace(namespace);
+ final TopicName topicName = TopicName
+ .get("persistent://" + namespace + "/topic");
+
+ @Cleanup
+ PulsarClient client1 = PulsarClient.builder()
+ .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+ .build();
+ Producer<byte[]> producer =
client1.newProducer().topic(topicName.toString())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ producer.close();
+ }
+
private void checkListContainExpectedTopic(PulsarAdmin admin, String
namespace, List<String> expectedTopicList) {
// wait non-partitioned topics replicators created finished
final List<String> list = new ArrayList<>();