This is an automated email from the ASF dual-hosted git repository.

yuruguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c1778e3c2 [Authorization] Fix producer/consume permission can’t get 
v1/schema (#16018)
95c1778e3c2 is described below

commit 95c1778e3c2a6254b81a7c8c95315473a713121f
Author: Ruguo Yu <[email protected]>
AuthorDate: Tue Aug 23 00:07:35 2022 +0800

    [Authorization] Fix producer/consume permission can’t get v1/schema (#16018)
    
    * [Authorization] Fix producer/consume permission can’t get v1/schema
---
 .../broker/admin/impl/SchemasResourceBase.java     | 207 ---------------------
 .../pulsar/broker/admin/v1/SchemasResource.java    |  96 +++++++++-
 2 files changed, 89 insertions(+), 214 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index 175ab5ac27c..a115b26407d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -29,32 +29,23 @@ import java.time.Clock;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.admin.AdminResource;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
-import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
-import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
-import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
-import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
-import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,33 +79,12 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    public void getSchema(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-        String schemaId = getSchemaId();
-        
pulsar().getSchemaRegistryService().getSchema(schemaId).handle((schema, error) 
-> {
-            handleGetSchemaResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean 
authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.GET_METADATA)
                 .thenApply(__ -> getSchemaId())
                 .thenCompose(schemaId -> 
pulsar().getSchemaRegistryService().getSchema(schemaId));
     }
 
-    public void getSchema(boolean authoritative, String version, AsyncResponse 
response) {
-        validateDestinationAndAdminOperation(authoritative);
-        String schemaId = getSchemaId();
-        ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
-        bbVersion.putLong(Long.parseLong(version));
-        SchemaVersion v = 
pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
-        pulsar().getSchemaRegistryService().getSchema(schemaId, 
v).handle((schema, error) -> {
-            handleGetSchemaResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean 
authoritative, String version) {
         return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.GET_METADATA)
                 .thenApply(__ -> getSchemaId())
@@ -127,16 +97,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void getAllSchemas(boolean authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-        
pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId).handle((schema,
 error) -> {
-            handleGetAllSchemasResponse(response, schema, error);
-            return null;
-        });
-    }
-
     public CompletableFuture<List<SchemaAndMetadata>> 
getAllSchemasAsync(boolean authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.GET_METADATA)
                 .thenCompose(__ -> {
@@ -145,24 +105,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void deleteSchema(boolean authoritative, AsyncResponse response, 
boolean force) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-        pulsar().getSchemaRegistryService().deleteSchema(schemaId, 
defaultIfEmpty(clientAppId(), ""), force)
-                .handle((version, error) -> {
-                    if (isNull(error)) {
-                        response.resume(Response.ok()
-                                
.entity(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build())
-                                .build());
-                    } else {
-                        log.error("[{}] Failed to delete schema for topic {}", 
clientAppId(), topicName, error);
-                        response.resume(new RestException(error));
-                    }
-                    return null;
-                });
-    }
-
     public CompletableFuture<SchemaVersion> deleteSchemaAsync(boolean 
authoritative, boolean force) {
         return validateDestinationAndAdminOperationAsync(authoritative)
                 .thenCompose(__ -> {
@@ -172,61 +114,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void postSchema(PostSchemaPayload payload, boolean authoritative, 
AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        
getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> 
{
-            byte[] data;
-            if (SchemaType.KEY_VALUE.name().equals(payload.getType())) {
-                try {
-                    data = DefaultImplementation.getDefaultImplementation()
-                            
.convertKeyValueDataStringToSchemaInfoSchema(payload.getSchema().getBytes(Charsets.UTF_8));
-                } catch (IOException conversionError) {
-                    log.error("[{}] Failed to post schema for topic {}", 
clientAppId(), topicName, conversionError);
-                    response.resume(new RestException(conversionError));
-                    return;
-                }
-            } else {
-                data = payload.getSchema().getBytes(Charsets.UTF_8);
-            }
-            pulsar().getSchemaRegistryService()
-                    .putSchemaIfAbsent(getSchemaId(),
-                            
SchemaData.builder().data(data).isDeleted(false).timestamp(clock.millis())
-                                    
.type(SchemaType.valueOf(payload.getType())).user(defaultIfEmpty(clientAppId(), 
""))
-                                    .props(payload.getProperties()).build(),
-                            schemaCompatibilityStrategy)
-                    .thenAccept(version -> response.resume(
-                            
Response.accepted().entity(PostSchemaResponse.builder().version(version).build()).build()))
-                    .exceptionally(error -> {
-                        Throwable throwable = 
FutureUtil.unwrapCompletionException(error);
-                        if (throwable instanceof IncompatibleSchemaException) {
-                            response.resume(Response
-                                    
.status(Response.Status.CONFLICT.getStatusCode(), throwable.getMessage())
-                                    .build());
-                        } else if (throwable instanceof 
InvalidSchemaDataException) {
-                            response.resume(Response.status(422, /* 
Unprocessable Entity */
-                                    throwable.getMessage()).build());
-                        } else {
-                            log.error("[{}] Failed to post schema for topic 
{}", clientAppId(), topicName, throwable);
-                            response.resume(new RestException(throwable));
-                        }
-                        return null;
-                    });
-        }).exceptionally(error -> {
-            Throwable throwable = FutureUtil.unwrapCompletionException(error);
-            if (throwable instanceof RestException) {
-                // Unprocessable Entity
-                response.resume(Response
-                        .status(((RestException) 
throwable).getResponse().getStatus(), throwable.getMessage())
-                        .build());
-            } else {
-                log.error("[{}] Failed to post schema for topic {}", 
clientAppId(), topicName, throwable);
-                response.resume(new RestException(throwable));
-            }
-            return null;
-        });
-    }
-
     public CompletableFuture<SchemaVersion> postSchemaAsync(PostSchemaPayload 
payload, boolean authoritative) {
         return validateDestinationAndAdminOperationAsync(authoritative)
                 .thenCompose(__ -> getSchemaCompatibilityStrategyAsync())
@@ -254,27 +141,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void testCompatibility(PostSchemaPayload payload, boolean 
authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-
-        
getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy 
-> pulsar()
-                        .getSchemaRegistryService().isCompatible(schemaId,
-                                
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                        
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                        .user(defaultIfEmpty(clientAppId(), 
"")).props(payload.getProperties()).build(),
-                                schemaCompatibilityStrategy)
-                        .thenAccept(isCompatible -> 
response.resume(Response.accepted()
-                                
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
-                                        
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
-                                .build())))
-                .exceptionally(error -> {
-                    response.resume(new 
RestException(FutureUtil.unwrapCompletionException(error)));
-                    return null;
-                });
-    }
-
     public CompletableFuture<Pair<Boolean, SchemaCompatibilityStrategy>> 
testCompatibilityAsync(
             PostSchemaPayload payload, boolean authoritative) {
         return validateDestinationAndAdminOperationAsync(authoritative)
@@ -292,26 +158,6 @@ public class SchemasResourceBase extends AdminResource {
                 });
     }
 
-    public void getVersionBySchema(PostSchemaPayload payload, boolean 
authoritative, AsyncResponse response) {
-        validateDestinationAndAdminOperation(authoritative);
-
-        String schemaId = getSchemaId();
-
-        pulsar().getSchemaRegistryService()
-                .findSchemaVersion(schemaId,
-                        
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
-                                
.timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType()))
-                                .user(defaultIfEmpty(clientAppId(), 
"")).props(payload.getProperties()).build())
-                .thenAccept(version -> response.resume(Response.accepted()
-                        
.entity(LongSchemaVersionResponse.builder().version(version).build()).build()))
-                .exceptionally(error -> {
-                    Throwable throwable = 
FutureUtil.unwrapCompletionException(error);
-                    log.error("[{}] Failed to get version by schema for topic 
{}", clientAppId(), topicName, throwable);
-                    response.resume(new RestException(throwable));
-                    return null;
-                });
-    }
-
     public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload 
payload, boolean authoritative) {
         return validateOwnershipAndOperationAsync(authoritative, 
TopicOperation.GET_METADATA)
                 .thenCompose(__ -> {
@@ -349,25 +195,6 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    protected static void handleGetSchemaResponse(AsyncResponse response, 
SchemaAndMetadata schema, Throwable error) {
-        if (isNull(error)) {
-            if (isNull(schema)) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schema not 
found").build());
-            } else if (schema.schema.isDeleted()) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schema is 
deleted").build());
-            } else {
-                
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
-                        
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
-            }
-        } else {
-            log.error("Failed to get schema", error);
-            response.resume(new RestException(error));
-        }
-
-    }
-
     protected GetSchemaResponse convertToSchemaResponse(SchemaAndMetadata 
schema) {
         if (isNull(schema)) {
             throw new RestException(Response.Status.NOT_FOUND.getStatusCode(), 
"Schema not found");
@@ -389,40 +216,6 @@ public class SchemasResourceBase extends AdminResource {
         }
     }
 
-    private static void handleGetAllSchemasResponse(AsyncResponse response, 
List<SchemaAndMetadata> schemas,
-            Throwable error) {
-        if (isNull(error)) {
-            if (isNull(schemas)) {
-                response.resume(Response.status(
-                        Response.Status.NOT_FOUND.getStatusCode(), "Schemas 
not found").build());
-            } else {
-                
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
-                        .entity(GetAllVersionsSchemaResponse.builder()
-                                .getSchemaResponses(schemas.stream()
-                                        
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
-                                        .collect(Collectors.toList()))
-                                .build())
-                        .build());
-            }
-        } else {
-            log.error("Failed to get all schemas", error);
-            response.resume(new RestException(error));
-        }
-    }
-
-    private void validateDestinationAndAdminOperation(boolean authoritative) {
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-        } catch (RestException e) {
-            if (e.getResponse().getStatus() == 
Response.Status.UNAUTHORIZED.getStatusCode()) {
-                throw new RestException(Response.Status.UNAUTHORIZED, 
e.getMessage());
-            } else {
-                throw e;
-            }
-        }
-    }
-
     private CompletableFuture<Void> 
validateDestinationAndAdminOperationAsync(boolean authoritative) {
         return validateTopicOwnershipAsync(topicName, authoritative)
                 .thenCompose(__ -> 
validateAdminAccessForTenantAsync(topicName.getTenant()));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
index c6e4239a3aa..13bfba35121 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/SchemasResource.java
@@ -38,14 +38,20 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.SchemasResourceBase;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
+import org.apache.pulsar.common.protocol.schema.LongSchemaVersionResponse;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.protocol.schema.PostSchemaResponse;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
 
 @Path("/schemas")
 @Api(
@@ -53,6 +59,7 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
     description = "Schemas related admin APIs",
     tags = "schemas"
 )
+@Slf4j
 public class SchemasResource extends SchemasResourceBase {
 
     public SchemasResource() {
@@ -81,7 +88,16 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getSchema(authoritative, response);
+        getSchemaAsync(authoritative)
+                .thenApply(schemaAndMetadata -> 
convertToSchemaResponse(schemaAndMetadata))
+                .thenApply(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {}", 
clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -107,7 +123,17 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getSchema(authoritative, version, response);
+        getSchemaAsync(authoritative, version)
+                .thenApply(schemaAndMetadata -> 
convertToSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get schema for topic {} with 
version {}",
+                                clientAppId(), topicName, version, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -132,7 +158,16 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getAllSchemas(authoritative, response);
+        getAllSchemasAsync(authoritative)
+                .thenApply(schemaAndMetadata -> 
convertToAllVersionsSchemaResponse(schemaAndMetadata))
+                .thenAccept(response::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get all schemas for topic 
{}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @DELETE
@@ -157,7 +192,17 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        deleteSchema(authoritative, response, force);
+        deleteSchemaAsync(authoritative, force)
+                .thenAccept(version -> {
+                    
response.resume(DeleteSchemaResponse.builder().version(getLongSchemaVersion(version)).build());
+                })
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to delete schemas for topic 
{}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -195,7 +240,25 @@ public class SchemasResource extends SchemasResourceBase {
         @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        postSchema(payload, authoritative, response);
+        postSchemaAsync(payload, authoritative)
+                .thenAccept(version -> 
response.resume(PostSchemaResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    Throwable root = FutureUtil.unwrapCompletionException(ex);
+                    if (root instanceof IncompatibleSchemaException) {
+                        response.resume(Response
+                                
.status(Response.Status.CONFLICT.getStatusCode(), root.getMessage())
+                                .build());
+                    } else if (root instanceof InvalidSchemaDataException) {
+                        response.resume(Response.status(422, /* Unprocessable 
Entity */
+                                root.getMessage()).build());
+                    } else {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to post schemas for topic 
{}", clientAppId(), topicName, root);
+                        }
+                        resumeAsyncResponseExceptionally(response, ex);
+                    }
+                    return null;
+                });
     }
 
     @POST
@@ -232,7 +295,18 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        testCompatibility(payload, authoritative, response);
+        testCompatibilityAsync(payload, authoritative)
+                .thenAccept(pair -> response.resume(Response.accepted()
+                        
.entity(IsCompatibilityResponse.builder().isCompatibility(pair.getLeft())
+                                
.schemaCompatibilityStrategy(pair.getRight().name()).build())
+                        .build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to test compatibility for topic 
{}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @POST
@@ -270,6 +344,14 @@ public class SchemasResource extends SchemasResourceBase {
             @Suspended final AsyncResponse response
     ) {
         validateTopicName(tenant, cluster, namespace, topic);
-        getVersionBySchema(payload, authoritative, response);
+        getVersionBySchemaAsync(payload, authoritative)
+                .thenAccept(version -> 
response.resume(LongSchemaVersionResponse.builder().version(version).build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get version by schema for 
topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 }

Reply via email to