This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26b47ffbcdc [improve][schema] Change update schema auth from tenant to
produce (#18074)
26b47ffbcdc is described below
commit 26b47ffbcdc7f91425ed1ff1cc6cd4d7644a2451
Author: congbo <[email protected]>
AuthorDate: Wed Oct 19 15:54:10 2022 +0800
[improve][schema] Change update schema auth from tenant to produce (#18074)
---
.../apache/pulsar/broker/admin/AdminResource.java | 54 +++++++++++-----------
.../broker/admin/impl/SchemasResourceBase.java | 4 +-
.../broker/admin/AdminApiSchemaWithAuthTest.java | 9 ++++
3 files changed, 38 insertions(+), 29 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 645c804af2f..e2c44a80d4a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -742,33 +742,7 @@ public abstract class AdminResource extends
PulsarWebResource {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
- .thenCompose((__) -> {
- CompletableFuture<SchemaCompatibilityStrategy> future;
- if (config().isTopicLevelPoliciesEnabled()) {
- future = getTopicPoliciesAsyncWithRetry(topicName)
- .thenApply(op ->
op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
-
- return
future.thenCompose((topicSchemaCompatibilityStrategy) -> {
- if
(!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
- return
CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
- }
- return
getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
- SchemaCompatibilityStrategy
schemaCompatibilityStrategy =
- policies.schema_compatibility_strategy;
- if
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
- schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
-
policies.schema_auto_update_compatibility_strategy);
- if
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
- schemaCompatibilityStrategy =
pulsar().getConfig().getSchemaCompatibilityStrategy();
- }
- }
- return schemaCompatibilityStrategy;
- });
- });
- }).whenComplete((__, ex) -> {
+ .thenCompose((__) ->
getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility
strategy of topic {} {}",
clientAppId(), topicName, ex);
@@ -776,6 +750,32 @@ public abstract class AdminResource extends
PulsarWebResource {
});
}
+ protected CompletableFuture<SchemaCompatibilityStrategy>
getSchemaCompatibilityStrategyAsyncWithoutAuth() {
+ CompletableFuture<SchemaCompatibilityStrategy> future =
CompletableFuture.completedFuture(null);
+ if (config().isTopicLevelPoliciesEnabled()) {
+ future = getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op ->
op.map(TopicPolicies::getSchemaCompatibilityStrategy).orElse(null));
+ }
+
+ return future.thenCompose((topicSchemaCompatibilityStrategy) -> {
+ if
(!SchemaCompatibilityStrategy.isUndefined(topicSchemaCompatibilityStrategy)) {
+ return
CompletableFuture.completedFuture(topicSchemaCompatibilityStrategy);
+ }
+ return getNamespacePoliciesAsync(namespaceName).thenApply(policies
-> {
+ SchemaCompatibilityStrategy schemaCompatibilityStrategy =
+ policies.schema_compatibility_strategy;
+ if
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+ schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
+
policies.schema_auto_update_compatibility_strategy);
+ if
(SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
+ schemaCompatibilityStrategy =
pulsar().getConfig().getSchemaCompatibilityStrategy();
+ }
+ }
+ return schemaCompatibilityStrategy;
+ });
+ });
+ }
+
@CanIgnoreReturnValue
public static <T> T checkNotNull(T reference) {
return Objects.requireNonNull(reference);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 0254ff395ba..76af5825143 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -114,8 +114,8 @@ public class SchemasResourceBase extends AdminResource {
}
public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload
payload, boolean authoritative) {
- return validateDestinationAndAdminOperationAsync(authoritative)
- .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
+ return validateOwnershipAndOperationAsync(authoritative,
TopicOperation.PRODUCE)
+ .thenCompose(__ ->
getSchemaCompatibilityStrategyAsyncWithoutAuth())
.thenCompose(schemaCompatibilityStrategy -> {
byte[] data;
if (SchemaType.KEY_VALUE.name().equals(payload.getType()))
{
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
index 46830b05204..4de4d905e49 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
@@ -58,6 +58,8 @@ public class AdminApiSchemaWithAuthTest extends
MockedPulsarServiceBaseTest {
private static final String ADMIN_TOKEN =
Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
private static final String CONSUME_TOKEN =
Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
+ private static final String PRODUCE_TOKEN =
Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact();
+
@BeforeMethod
@Override
public void setup() throws Exception {
@@ -108,11 +110,18 @@ public class AdminApiSchemaWithAuthTest extends
MockedPulsarServiceBaseTest {
.serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
brokerUrlTls.toString())
.authentication(AuthenticationToken.class.getName(),
CONSUME_TOKEN)
.build();
+
+ PulsarAdmin adminWithProducePermission = PulsarAdmin.builder()
+ .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
brokerUrlTls.toString())
+ .authentication(AuthenticationToken.class.getName(),
PRODUCE_TOKEN)
+ .build();
admin.topics().grantPermission(topicName, "consumer",
EnumSet.of(AuthAction.consume));
admin.topics().grantPermission(topicName, "producer",
EnumSet.of(AuthAction.produce));
SchemaInfo si = Schema.BOOL.getSchemaInfo();
+ assertThrows(PulsarAdminException.class, () ->
adminWithConsumePermission.schemas().getSchemaInfo(topicName));
assertThrows(PulsarAdminException.class, () ->
adminWithoutPermission.schemas().createSchema(topicName, si));
+ adminWithProducePermission.schemas().createSchema(topicName, si);
adminWithAdminPermission.schemas().createSchema(topicName, si);
assertThrows(PulsarAdminException.class, () ->
adminWithoutPermission.schemas().getSchemaInfo(topicName));