This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1d5162 Fix authorization error if partition number of partitioned
topic is updated. (#10300) (#10333)
c1d5162 is described below
commit c1d516254f656e029d6daf6f4ca8914d20ff0d1b
Author: Shen Liu <[email protected]>
AuthorDate: Thu May 6 08:31:08 2021 +0800
Fix authorization error if partition number of partitioned topic is
updated. (#10300) (#10333)
Fixes #10300
### Motivation
Fix the bug that after updating the partition number of a partitioned
topic, which has topic level auth policy, new producer/consumer of this topic
will get error.
### Modifications
In
[`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#checkPermission`](https://github.com/apache/pulsar/blob/889b9b8e5efc62d2d0cbc761205fba5759c97af0/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L394),
if current `topicName` is a sub partition topic, also check the permissions of
its partitioned topic.
---
.../authorization/PulsarAuthorizationProvider.java | 21 ++++-
.../server/ProxyWithJwtAuthorizationTest.java | 91 ++++++++++++++++++++++
2 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index c7dd2f4..f0118b6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -434,16 +434,33 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
return;
}
}
+
+ // If the partition number of the partitioned topic having
topic level policy is updated,
+ // the new sub partitions may not inherit the policy of
the partition topic.
+ // We can also check the permission of partitioned topic.
+ // For https://github.com/apache/pulsar/issues/10300
+ if (topicName.isPartitioned()) {
+ topicRoles =
policies.get().auth_policies.destination_auth.get(topicName.getPartitionedTopicName());
+ if (topicRoles != null) {
+ // Topic has custom policy
+ Set<AuthAction> topicActions =
topicRoles.get(role);
+ if (topicActions != null &&
topicActions.contains(action)) {
+ // The role has topic level permission
+ permissionFuture.complete(true);
+ return;
+ }
+ }
+ }
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
- log.warn("Client with Role - {} failed to get permissions for
topic - {}. {}", role, topicName,
+ log.warn("Client with Role - {} failed to get permissions for
topic - {}. {}", role, topicName,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
- log.warn("Client with Role - {} failed to get permissions for
topic - {}. {}", role, topicName,
+ log.warn("Client with Role - {} failed to get permissions for
topic - {}. {}", role, topicName,
e.getMessage());
permissionFuture.completeExceptionally(e);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index f683adf..4a7d0b8 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -203,6 +203,97 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
/**
* <pre>
+ * 1. Create a 2-partition topic and grant produce/consume permission to
client role.
+ * 2. Use producer/consumer with client role to process the topic, which
is fine.
+ * 2. Update the topic partition number to 4.
+ * 3. Use new producer/consumer with client role to process the topic.
+ * 4. Broker should authorize producer/consumer normally.
+ * </pre>
+ */
+ @Test
+ public void testUpdatePartitionNumAndReconnect() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ startProxy();
+ createAdminClient();
+ PulsarClient proxyClient =
createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());
+
+ String clusterName = "proxy-authorization";
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+ String subscriptionName = "my-subscriber-name";
+
+ admin.clusters().createCluster(clusterName, new
ClusterData(brokerUrl.toString()));
+
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet(),
Sets.newHashSet(clusterName)));
+ admin.namespaces().createNamespace(namespaceName);
+ admin.topics().createPartitionedTopic(topicName, 2);
+ admin.topics().grantPermission(topicName, CLIENT_ROLE,
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ Consumer<byte[]> consumer = proxyClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName).subscribe();
+
+ Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+ final int MSG_NUM = 10;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < MSG_NUM; i++) {
+ String message = "my-message-" + i;
+ messageSet.add(message);
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg;
+ Set<String> receivedMessageSet = Sets.newHashSet();
+ for (int i = 0; i < MSG_NUM; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ receivedMessageSet.add(expectedMessage);
+ consumer.acknowledgeAsync(msg);
+ }
+ Assert.assertEquals(messageSet, receivedMessageSet);
+ consumer.close();
+ producer.close();
+
+ // update partition num
+ admin.topics().updatePartitionedTopic(topicName, 4);
+
+ // produce/consume the topic again
+ consumer = proxyClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName).subscribe();
+ producer = proxyClient.newProducer(Schema.BYTES)
+ .topic(topicName).create();
+
+ messageSet.clear();
+ for (int i = 0; i < MSG_NUM; i++) {
+ String message = "my-message-" + i;
+ messageSet.add(message);
+ producer.send(message.getBytes());
+ }
+
+ receivedMessageSet.clear();
+ for (int i = 0; i < MSG_NUM; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ receivedMessageSet.add(expectedMessage);
+ consumer.acknowledgeAsync(msg);
+ }
+ Assert.assertEquals(messageSet, receivedMessageSet);
+ consumer.close();
+ producer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ /**
+ * <pre>
* It verifies jwt + Authentication + Authorization (client -> proxy ->
broker).
* It also test `SubscriptionAuthMode.Prefix` mode.
*