This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26b47ffbcdc [improve][schema] Change update schema auth from tenant to 
produce (#18074)
26b47ffbcdc is described below

commit 26b47ffbcdc7f91425ed1ff1cc6cd4d7644a2451
Author: congbo <[email protected]>
AuthorDate: Wed Oct 19 15:54:10 2022 +0800

    [improve][schema] Change update schema auth from tenant to produce (#18074)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 54 +++++++++++-----------
 .../broker/admin/impl/SchemasResourceBase.java     |  4 +-
 .../broker/admin/AdminApiSchemaWithAuthTest.java   |  9 ++++
 3 files changed, 38 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 645c804af2f..e2c44a80d4a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -742,33 +742,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return validateTopicPolicyOperationAsync(topicName,
                 PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
                 PolicyOperation.READ)
-                .thenCompose((__) -> {
-                    CompletableFuture<SchemaCompatibilityStrategy> future;
-                    if (config().isTopicLevelPoliciesEnabled()) {
-                        future = getTopicPoliciesAsyncWithRetry(topicName)
-                                .thenApply(op -> 
op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
-                    } else {
-                        future = CompletableFuture.completedFuture(null);
-                    }
-
-                    return 
future.thenCompose((topicSchemaCompatibilityStrategy) -> {
-                        if 
(!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
-                            return 
CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
-                        }
-                        return 
getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
-                            SchemaCompatibilityStrategy 
schemaCompatibilityStrategy =
-                                    policies.schema_compatibility_strategy;
-                            if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
-                                schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-                                        
policies.schema_auto_update_compatibility_strategy);
-                                if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
-                                    schemaCompatibilityStrategy = 
pulsar().getConfig().getSchemaCompatibilityStrategy();
-                                }
-                            }
-                            return schemaCompatibilityStrategy;
-                        });
-                    });
-                }).whenComplete((__, ex) -> {
+                .thenCompose((__) -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
                     if (ex != null) {
                         log.error("[{}] Failed to get schema compatibility 
strategy of topic {} {}",
                                 clientAppId(), topicName, ex);
@@ -776,6 +750,32 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
+    protected CompletableFuture<SchemaCompatibilityStrategy> 
getSchemaCompatibilityStrategyAsyncWithoutAuth() {
+        CompletableFuture<SchemaCompatibilityStrategy> future = 
CompletableFuture.completedFuture(null);
+        if (config().isTopicLevelPoliciesEnabled()) {
+            future = getTopicPoliciesAsyncWithRetry(topicName)
+                    .thenApply(op -> 
op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
+        }
+
+        return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
+            if 
(!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
+                return 
CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
+            }
+            return getNamespacePoliciesAsync(namespaceName).thenApply(policies 
-> {
+                SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+                        policies.schema_compatibility_strategy;
+                if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                    schemaCompatibilityStrategy = 
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+                            
policies.schema_auto_update_compatibility_strategy);
+                    if 
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+                        schemaCompatibilityStrategy = 
pulsar().getConfig().getSchemaCompatibilityStrategy();
+                    }
+                }
+                return schemaCompatibilityStrategy;
+            });
+        });
+    }
+
     @CanIgnoreReturnValue
     public static <T> T checkNotNull(T reference) {
         return Objects.requireNonNull(reference);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0254ff395ba..76af5825143 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -114,8 +114,8 @@ public class SchemasResourceBase extends AdminResource {
     }
 
     public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload 
payload, boolean authoritative) {
-        return validateDestinationAndAdminOperationAsync(authoritative)
-                .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+        return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.PRODUCE)
+                .thenCompose(__ -> 
getSchemaCompatibilityStrategyAsyncWithoutAuth())
                 .thenCompose(schemaCompatibilityStrategy -> {
                     byte[] data;
                     if (SchemaType.KEY_VALUE.name().equals(payload.getType())) 
{
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index 46830b05204..4de4d905e49 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -58,6 +58,8 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
     private static final String ADMIN_TOKEN = 
Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
     private static final String CONSUME_TOKEN = 
Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
 
+    private static final String PRODUCE_TOKEN = 
Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();
+
     @BeforeMethod
     @Override
     public void setup() throws Exception {
@@ -108,11 +110,18 @@ public class AdminApiSchemaWithAuthTest extends 
MockedPulsarServiceBaseTest {
                 .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
                 .authentication(AuthenticationToken.class.getName(), 
CONSUME_TOKEN)
                 .build();
+
+        PulsarAdmin adminWithProducePermission = PulsarAdmin.builder()
+                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
brokerUrlTls.toString())
+                .authentication(AuthenticationToken.class.getName(), 
PRODUCE_TOKEN)
+                .build();
         admin.topics().grantPermission(topicName, "consumer", 
EnumSet.of(AuthAction.consume));
         admin.topics().grantPermission(topicName, "producer", 
EnumSet.of(AuthAction.produce));
 
         SchemaInfo si = Schema.BOOL.getSchemaInfo();
+        assertThrows(PulsarAdminException.class, () -> 
adminWithConsumePermission.schemas().getSchemaInfo(topicName));
         assertThrows(PulsarAdminException.class, () -> 
adminWithoutPermission.schemas().createSchema(topicName, si));
+        adminWithProducePermission.schemas().createSchema(topicName, si);
         adminWithAdminPermission.schemas().createSchema(topicName, si);
 
         assertThrows(PulsarAdminException.class, () -> 
adminWithoutPermission.schemas().getSchemaInfo(topicName));

Reply via email to