This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c13f18  [schema] Introduce schema data validator (#4360)
6c13f18 is described below

commit 6c13f189423892b1c670e2a4791f16895ea23c4a
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Jun 20 19:46:17 2019 -0700

    [schema] Introduce schema data validator (#4360)
    
    *Motivation*
    
    Currently the schema data is only validated in compatibility checker. If 
the schema data is uploaded from admin api, there is no validation if the 
schema is the first version.
    
    *Changes*
    
    Add schema data validator to validate the schema data before storing it in 
schema storage.
---
 .../pulsar/broker/admin/v2/SchemasResource.java    |  88 ++++++-----
 .../broker/service/BrokerServiceException.java     |  22 ++-
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   8 +-
 .../schema/KeyValueSchemaCompatibilityCheck.java   |   2 +-
 .../service/schema/SchemaRegistryService.java      |   4 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |   1 +
 .../IncompatibleSchemaException.java               |  10 +-
 .../InvalidSchemaDataException.java}               |  18 ++-
 .../SchemaException.java}                          |  20 ++-
 .../validator/PrimitiveSchemaDataValidator.java    |  45 ++++++
 .../schema/validator/SchemaDataValidator.java      |  87 +++++++++++
 ...hemaRegistryServiceWithSchemaDataValidator.java |  98 +++++++++++++
 .../validator/StringSchemaDataValidator.java       |  53 +++++++
 .../validator/StructSchemaDataValidator.java       |  77 ++++++++++
 .../schema/validator/SchemaDataValidatorTest.java  | 133 +++++++++++++++++
 ...RegistryServiceWithSchemaDataValidatorTest.java | 163 +++++++++++++++++++++
 .../api/SimpleTypedProducerConsumerTest.java       |  68 +++------
 .../org/apache/pulsar/client/admin/Schemas.java    |   9 ++
 .../pulsar/client/admin/internal/SchemasImpl.java  |  52 +++++--
 20 files changed, 828 insertions(+), 132 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index eb7cc64..1e8c8e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.isNull;
 import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
 import static org.apache.pulsar.common.util.Codec.decode;
@@ -48,9 +49,11 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -85,7 +88,7 @@ public class SchemasResource extends AdminResource {
         this.clock = clock;
     }
 
-    private long getLongSchemaVersion(SchemaVersion schemaVersion) {
+    private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
         if (schemaVersion instanceof LongSchemaVersion) {
             return ((LongSchemaVersion) schemaVersion).getVersion();
         } else {
@@ -116,29 +119,7 @@ public class SchemasResource extends AdminResource {
         String schemaId = buildSchemaId(tenant, namespace, topic);
         pulsar().getSchemaRegistryService().getSchema(schemaId)
             .handle((schema, error) -> {
-                if (isNull(error)) {
-                    if (isNull(schema)) {
-                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
-                    } else if (schema.schema.isDeleted()) {
-                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
-                    } else {
-                        response.resume(
-                            Response.ok()
-                                .encoding(MediaType.APPLICATION_JSON)
-                                .entity(GetSchemaResponse.builder()
-                                    
.version(getLongSchemaVersion(schema.version))
-                                    .type(schema.schema.getType())
-                                    .timestamp(schema.schema.getTimestamp())
-                                    .data(new String(schema.schema.getData()))
-                                    .properties(schema.schema.getProps())
-                                    .build()
-                                )
-                                .build()
-                        );
-                    }
-                } else {
-                    response.resume(error);
-                }
+                handleGetSchemaResponse(response, schema, error);
                 return null;
             });
     }
@@ -170,32 +151,38 @@ public class SchemasResource extends AdminResource {
         SchemaVersion v = 
pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
         pulsar().getSchemaRegistryService().getSchema(schemaId, v)
             .handle((schema, error) -> {
-                if (isNull(error)) {
-                    if (isNull(schema)) {
-                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
-                    } else if (schema.schema.isDeleted()) {
-                        
response.resume(Response.status(Response.Status.NOT_FOUND).build());
-                    } else {
-                        response.resume(
-                            Response.ok()
-                                .encoding(MediaType.APPLICATION_JSON)
-                                .entity(GetSchemaResponse.builder()
-                                    
.version(getLongSchemaVersion(schema.version))
-                                    .type(schema.schema.getType())
-                                    .timestamp(schema.schema.getTimestamp())
-                                    .data(new String(schema.schema.getData()))
-                                    .properties(schema.schema.getProps())
-                                    .build()
-                                ).build()
-                        );
-                    }
-                } else {
-                    response.resume(error);
-                }
+                handleGetSchemaResponse(response, schema, error);
                 return null;
             });
     }
 
+    private static void handleGetSchemaResponse(AsyncResponse response,
+                                                SchemaAndMetadata schema, 
Throwable error) {
+        if (isNull(error)) {
+            if (isNull(schema)) {
+                
response.resume(Response.status(Response.Status.NOT_FOUND).build());
+            } else if (schema.schema.isDeleted()) {
+                
response.resume(Response.status(Response.Status.NOT_FOUND).build());
+            } else {
+                response.resume(
+                    Response.ok()
+                        .encoding(MediaType.APPLICATION_JSON)
+                        .entity(GetSchemaResponse.builder()
+                            .version(getLongSchemaVersion(schema.version))
+                            .type(schema.schema.getType())
+                            .timestamp(schema.schema.getTimestamp())
+                            .data(new String(schema.schema.getData(), UTF_8))
+                            .properties(schema.schema.getProps())
+                            .build()
+                        ).build()
+                );
+            }
+        } else {
+            response.resume(error);
+        }
+
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/schema")
     @Produces(MediaType.APPLICATION_JSON)
@@ -244,7 +231,9 @@ public class SchemasResource extends AdminResource {
         @ApiResponse(code = 401, message = "Client is not authorized or Don't 
have admin permission"),
         @ApiResponse(code = 403, message = "Client is not authenticated"),
         @ApiResponse(code = 404, message = "Tenant or Namespace or Topic 
doesn't exist"),
+        @ApiResponse(code = 409, message = "Incompatible schema"),
         @ApiResponse(code = 412, message = "Failed to find the ownership for 
the topic"),
+        @ApiResponse(code = 422, message = "Invalid schema data"),
     })
     public void postSchema(
         @PathParam("tenant") String tenant,
@@ -287,6 +276,11 @@ public class SchemasResource extends AdminResource {
         ).exceptionally(error -> {
             if (error instanceof IncompatibleSchemaException) {
                 
response.resume(Response.status(Response.Status.CONFLICT).build());
+            } else if (error instanceof InvalidSchemaDataException) {
+                response.resume(Response.status(
+                    422, /* Unprocessable Entity */
+                    error.getMessage()
+                ).build());
             } else {
                 response.resume(
                     Response.serverError().build()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index c3f6909..2e28700 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -18,7 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 
@@ -37,6 +38,10 @@ public class BrokerServiceException extends Exception {
         super(t);
     }
 
+    public BrokerServiceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
     public static class ConsumerBusyException extends BrokerServiceException {
         public ConsumerBusyException(String msg) {
             super(msg);
@@ -154,6 +159,10 @@ public class BrokerServiceException extends Exception {
     }
 
     public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
+        return getClientErrorCode(t, true);
+    }
+
+    private static PulsarApi.ServerError getClientErrorCode(Throwable t, 
boolean checkCauseIfUnknown) {
         if (t instanceof ServerMetadataException) {
             return PulsarApi.ServerError.MetadataError;
         } else if (t instanceof NamingException) {
@@ -171,12 +180,19 @@ public class BrokerServiceException extends Exception {
         } else if (t instanceof ServiceUnitNotReadyException || t instanceof 
TopicFencedException
                 || t instanceof SubscriptionFencedException) {
             return PulsarApi.ServerError.ServiceNotReady;
-        } else if (t instanceof IncompatibleSchemaException) {
+        } else if (t instanceof IncompatibleSchemaException
+            || t instanceof InvalidSchemaDataException) {
+            // for backward compatible with old clients, invalid schema data
+            // is treated as "incompatible schema".
             return PulsarApi.ServerError.IncompatibleSchema;
         } else if (t instanceof ConsumerAssignException) {
             return ServerError.ConsumerAssignError;
         } else {
-            return PulsarApi.ServerError.UnknownError;
+            if (checkCauseIfUnknown) {
+                return getClientErrorCode(t.getCause(), false);
+            } else {
+                return PulsarApi.ServerError.UnknownError;
+            }
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 9276892..678347a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -312,7 +312,7 @@ public class Consumer {
         }).exceptionally(exception -> {
             log.warn("Unsubscribe failed for {}", subscription, exception);
             ctx.writeAndFlush(
-                    Commands.newError(requestId, 
BrokerServiceException.getClientErrorCode(exception.getCause()),
+                    Commands.newError(requestId, 
BrokerServiceException.getClientErrorCode(exception),
                             exception.getCause().getMessage()));
             return null;
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a29c9a4..d8de814 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -58,7 +58,7 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationState;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -738,7 +738,7 @@ public class ServerCnx extends PulsarHandler {
                                     // back to client, only if not completed 
already.
                                     if 
(consumerFuture.completeExceptionally(exception)) {
                                         
ctx.writeAndFlush(Commands.newError(requestId,
-                                                
BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                                
BrokerServiceException.getClientErrorCode(exception),
                                                 
exception.getCause().getMessage()));
                                     }
                                     consumers.remove(consumerId, 
consumerFuture);
@@ -927,7 +927,7 @@ public class ServerCnx extends PulsarHandler {
 
                             schemaVersionFuture.exceptionally(exception -> {
                                 ctx.writeAndFlush(Commands.newError(requestId,
-                                        
BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                        
BrokerServiceException.getClientErrorCode(exception),
                                         exception.getMessage()));
                                 producers.remove(producerId, producerFuture);
                                 return null;
@@ -1455,7 +1455,7 @@ public class ServerCnx extends PulsarHandler {
             future.getNow(null);
         } catch (Exception e) {
             if (e.getCause() instanceof BrokerServiceException) {
-                error = 
BrokerServiceException.getClientErrorCode((BrokerServiceException) 
e.getCause());
+                error = 
BrokerServiceException.getClientErrorCode(e.getCause());
             }
         }
         return error;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
index 2426f3e..c5fa364 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -39,7 +39,7 @@ public class KeyValueSchemaCompatibilityCheck implements 
SchemaCompatibilityChec
         this.checkers = checkers;
     }
 
-    private KeyValue<SchemaData, SchemaData> 
decodeKeyValueSchemaData(SchemaData schemaData) {
+    public static KeyValue<SchemaData, SchemaData> 
decodeKeyValueSchemaData(SchemaData schemaData) {
         KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
             
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo());
         return new KeyValue<>(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index baa57a4..73244cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,8 @@ public interface SchemaRegistryService extends SchemaRegistry 
{
 
             schemaStorage.start();
 
-            return new SchemaRegistryServiceImpl(schemaStorage, checkers);
+            return SchemaRegistryServiceWithSchemaDataValidator.of(
+                new SchemaRegistryServiceImpl(schemaStorage, checkers));
         } catch (Exception e) {
             log.warn("Unable to create schema registry storage, defaulting to 
empty storage: {}", e);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5c38c8a..57246f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
similarity index 78%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
index 975ba0d..e045deb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
@@ -16,9 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
+
+/**
+ * Exception is thrown when an incompatible schema is used.
+ */
+public class IncompatibleSchemaException extends SchemaException {
+
+    private static final long serialVersionUID = -6013970359956508359L;
 
-public class IncompatibleSchemaException extends Exception {
     public IncompatibleSchemaException() {
         super("Incompatible schema used");
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
similarity index 66%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
index 975ba0d..a6a5808 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
@@ -16,14 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
 
-public class IncompatibleSchemaException extends Exception {
-    public IncompatibleSchemaException() {
-        super("Incompatible schema used");
-    }
+/**
+ * Exception thrown when the schema data is not in a valid form.
+ */
+public class InvalidSchemaDataException extends SchemaException {
 
-    public IncompatibleSchemaException(String message) {
+    private static final long serialVersionUID = -2846364736743783766L;
+
+    public InvalidSchemaDataException(String message) {
         super(message);
     }
+
+    public InvalidSchemaDataException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
similarity index 66%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
index 975ba0d..54a22e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
 
-public class IncompatibleSchemaException extends Exception {
-    public IncompatibleSchemaException() {
-        super("Incompatible schema used");
-    }
+import org.apache.pulsar.broker.service.BrokerServiceException;
+
+/**
+ * Schema related exceptions.
+ */
+public class SchemaException extends BrokerServiceException {
 
-    public IncompatibleSchemaException(String message) {
+    private static final long serialVersionUID = -6587520779026691815L;
+
+    public SchemaException(String message) {
         super(message);
     }
+
+    public SchemaException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
new file mode 100644
index 0000000..346a6d0
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+
+/**
+ * Validate if the primitive schema is in expected form.
+ */
+class PrimitiveSchemaDataValidator implements SchemaDataValidator {
+
+    public static PrimitiveSchemaDataValidator of() {
+        return INSTANCE;
+    }
+
+    private static final PrimitiveSchemaDataValidator INSTANCE = new 
PrimitiveSchemaDataValidator();
+
+    private PrimitiveSchemaDataValidator() {}
+
+    @Override
+    public void validate(SchemaData schemaData) throws 
InvalidSchemaDataException {
+        byte[] data = schemaData.getData();
+        if (null != data && data.length > 0) {
+            throw new InvalidSchemaDataException("Invalid schema definition 
data for primitive schemas :"
+                + "length of schema data should be zero, but " + data.length + 
" bytes is found");
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
new file mode 100644
index 0000000..0c44612
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import 
org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.KeyValue;
+
+/**
+ * A validator to validate the schema data is well formed.
+ */
+public interface SchemaDataValidator {
+
+    /**
+     * Validate if the schema data is well formed.
+     *
+     * @param schemaData schema data to validate
+     * @throws InvalidSchemaDataException if the schema data is not in a valid 
form.
+     */
+    static void validateSchemaData(SchemaData schemaData) throws 
InvalidSchemaDataException {
+        switch (schemaData.getType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF:
+                StructSchemaDataValidator.of().validate(schemaData);
+                break;
+            case STRING:
+                StringSchemaDataValidator.of().validate(schemaData);
+                break;
+            case BOOLEAN:
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                PrimitiveSchemaDataValidator.of().validate(schemaData);
+                break;
+            case NONE:
+            case BYTES:
+                // `NONE` and `BYTES` schema is not stored
+                break;
+            case AUTO:
+            case AUTO_CONSUME:
+            case AUTO_PUBLISH:
+                throw new InvalidSchemaDataException(
+                    "Schema " + schemaData.getType() + " is a client-side 
schema type");
+            case KEY_VALUE:
+                KeyValue<SchemaData, SchemaData> kvSchema =
+                    
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
+                validateSchemaData(kvSchema.getKey());
+                validateSchemaData(kvSchema.getValue());
+                break;
+            default:
+                throw new InvalidSchemaDataException("Unknown schema type : " 
+ schemaData.getType());
+        }
+    }
+
+    /**
+     * Validate a schema data is in a valid form.
+     *
+     * @param schemaData schema data to validate
+     * @throws InvalidSchemaDataException if the schema data is not in a valid 
form.
+     */
+    void validate(SchemaData schemaData) throws InvalidSchemaDataException;
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
new file mode 100644
index 0000000..f0ca3fc
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * A {@link SchemaRegistryService} wrapper that validate schema data.
+ */
+public class SchemaRegistryServiceWithSchemaDataValidator implements 
SchemaRegistryService {
+
+    public static SchemaRegistryServiceWithSchemaDataValidator 
of(SchemaRegistryService service) {
+        return new SchemaRegistryServiceWithSchemaDataValidator(service);
+    }
+
+    private final SchemaRegistryService service;
+
+    private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService 
service) {
+        this.service = service;
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.service.close();
+    }
+
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+        return this.service.getSchema(schemaId);
+    }
+
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, 
SchemaVersion version) {
+        return this.service.getSchema(schemaId, version);
+    }
+
+    @Override
+    public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>> 
getAllSchemas(String schemaId) {
+        return this.service.getAllSchemas(schemaId);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
+                                                              SchemaData 
schema,
+                                                              
SchemaCompatibilityStrategy strategy) {
+        try {
+            SchemaDataValidator.validateSchemaData(schema);
+        } catch (InvalidSchemaDataException e) {
+            return FutureUtil.failedFuture(e);
+        }
+        return service.putSchemaIfAbsent(schemaId, schema, strategy);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, 
String user) {
+        return service.deleteSchema(schemaId, user);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isCompatible(String schemaId,
+                                                   SchemaData schema,
+                                                   SchemaCompatibilityStrategy 
strategy) {
+        try {
+            SchemaDataValidator.validateSchemaData(schema);
+        } catch (InvalidSchemaDataException e) {
+            return FutureUtil.failedFuture(e);
+        }
+        return service.isCompatible(schemaId, schema, strategy);
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        return service.versionFromBytes(version);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
new file mode 100644
index 0000000..4368b14
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+
+/**
+ * Validate if the string schema is in expected form.
+ */
+class StringSchemaDataValidator implements SchemaDataValidator {
+
+    public static final StringSchemaDataValidator of() {
+        return INSTANCE;
+    }
+
+    private static final StringSchemaDataValidator INSTANCE = new 
StringSchemaDataValidator();
+
+    private static final String PY_NONE_SCHEMA_INFO = "null";
+
+    private StringSchemaDataValidator() {}
+
+    @Override
+    public void validate(SchemaData schemaData) throws 
InvalidSchemaDataException {
+        byte[] data = schemaData.getData();
+        if (null != data && data.length > 0) {
+            // python send 'null' string as schema data
+            String schemaDataStr = new String(data, UTF_8);
+            if (!PY_NONE_SCHEMA_INFO.equals(schemaDataStr)) {
+                throw new InvalidSchemaDataException("Invalid schema 
definition data for string schema : '"
+                    + schemaDataStr + "'");
+            }
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
new file mode 100644
index 0000000..1eeed83
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+/**
+ * Validate if the struct schema is in expected form.
+ */
+class StructSchemaDataValidator implements SchemaDataValidator {
+
+    public static StructSchemaDataValidator of() {
+        return INSTANCE;
+    }
+
+    private static final StructSchemaDataValidator INSTANCE = new 
StructSchemaDataValidator();
+
+    private StructSchemaDataValidator() {}
+
+    @Override
+    public void validate(SchemaData schemaData) throws 
InvalidSchemaDataException {
+        byte[] data = schemaData.getData();
+
+        try {
+            Schema.Parser avroSchemaParser = new Schema.Parser();
+            avroSchemaParser.parse(new String(data, UTF_8));
+        } catch (SchemaParseException e) {
+            if (schemaData.getType() == SchemaType.JSON) {
+                // we used JsonSchema for storing the definition of a JSON 
schema
+                // hence for backward compatibility consideration, we need to 
try
+                // to use JsonSchema to decode the schema data
+                ObjectMapper objectMapper = 
ObjectMapperFactory.getThreadLocal();
+                try {
+                    objectMapper.readValue(data, JsonSchema.class);
+                } catch (IOException ioe) {
+                    throwInvalidSchemaDataException(schemaData, ioe);
+                }
+            } else {
+                throwInvalidSchemaDataException(schemaData, e);
+            }
+        } catch (Exception e) {
+            throwInvalidSchemaDataException(schemaData, e);
+        }
+    }
+
+    private static void throwInvalidSchemaDataException(SchemaData schemaData,
+                                                        Throwable cause) 
throws InvalidSchemaDataException {
+        throw new InvalidSchemaDataException("Invalid schema definition data 
for "
+            + schemaData.getType() + " schema", cause);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
new file mode 100644
index 0000000..998d363
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class SchemaDataValidatorTest {
+
+    private static class Foo {
+        int field;
+    }
+
+    @DataProvider(name = "primitiveSchemas")
+    public static Object[][] primitiveSchemas() {
+        return new Object[][] {
+            { SchemaType.STRING },
+            { SchemaType.BOOLEAN },
+            { SchemaType.INT8 },
+            { SchemaType.INT16 },
+            { SchemaType.INT32 },
+            { SchemaType.INT64 },
+            { SchemaType.FLOAT },
+            { SchemaType.DOUBLE },
+            { SchemaType.DATE },
+            { SchemaType.TIME },
+            { SchemaType.TIMESTAMP },
+        };
+    }
+
+    @DataProvider(name = "clientSchemas")
+    public static Object[][] clientSchemas() {
+        return new Object[][] {
+            { SchemaType.AUTO_CONSUME },
+            { SchemaType.AUTO_PUBLISH },
+            { SchemaType.AUTO },
+        };
+    }
+
+    @DataProvider(name = "structSchemas")
+    public static Object[][] structSchemas() {
+        return new Object[][] {
+            { SchemaType.AVRO },
+            { SchemaType.JSON },
+            { SchemaType.PROTOBUF },
+        };
+    }
+
+    @Test(dataProvider = "primitiveSchemas")
+    public void testPrimitiveValidatorSuccess(SchemaType type) throws 
Exception {
+        SchemaData data = SchemaData.builder()
+            .type(type)
+            .data(new byte[0])
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+    @Test(dataProvider = "primitiveSchemas", expectedExceptions = 
InvalidSchemaDataException.class)
+    public void testPrimitiveValidatorInvalid(SchemaType type) throws 
Exception {
+        SchemaData data = SchemaData.builder()
+            .type(type)
+            .data(new byte[10])
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+    @Test(dataProvider = "clientSchemas", expectedExceptions = 
InvalidSchemaDataException.class)
+    public void testValidateClientSchemas(SchemaType type) throws Exception {
+        SchemaData data = SchemaData.builder()
+            .type(type)
+            .data(new byte[0])
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+    @Test(dataProvider = "structSchemas")
+    public void testStructValidatorSuccess(SchemaType type) throws Exception {
+        Schema<Foo> schema = Schema.AVRO(Foo.class);
+        SchemaData data = SchemaData.builder()
+            .type(type)
+            .data(schema.getSchemaInfo().getSchema())
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+    @Test(dataProvider = "structSchemas", expectedExceptions = 
InvalidSchemaDataException.class)
+    public void testStructValidatorInvalid(SchemaType type) throws Exception {
+        SchemaData data = SchemaData.builder()
+            .type(type)
+            .data("bad-schema".getBytes(UTF_8))
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+    @Test
+    public void testJsonSchemaTypeWithJsonSchemaData() throws Exception {
+        ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+        SchemaData data = SchemaData.builder()
+            .type(SchemaType.JSON)
+            .data(
+                mapper.writeValueAsBytes(
+                    new JsonSchemaGenerator(mapper)
+                    .generateSchema(Foo.class)))
+            .build();
+        SchemaDataValidator.validateSchemaData(data);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
new file mode 100644
index 0000000..aa1cfbb
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link SchemaRegistryServiceWithSchemaDataValidator}.
+ */
+public class SchemaRegistryServiceWithSchemaDataValidatorTest {
+
+    private SchemaRegistryService underlyingService;
+    private SchemaRegistryServiceWithSchemaDataValidator service;
+
+    @BeforeMethod
+    public void setup() {
+        this.underlyingService = mock(SchemaRegistryService.class);
+        this.service = 
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+    }
+
+    @Test
+    public void testGetLatestSchema() {
+        String schemaId = "test-schema-id";
+        CompletableFuture<SchemaAndMetadata> getFuture = new 
CompletableFuture<>();
+        when(underlyingService.getSchema(eq(schemaId))).thenReturn(getFuture);
+        assertSame(getFuture, service.getSchema(schemaId));
+        verify(underlyingService, times(1)).getSchema(eq(schemaId));
+    }
+
+    @Test
+    public void testGetSchemaByVersion() {
+        String schemaId = "test-schema-id";
+        CompletableFuture<SchemaAndMetadata> getFuture = new 
CompletableFuture<>();
+        when(underlyingService.getSchema(eq(schemaId), 
any(SchemaVersion.class)))
+            .thenReturn(getFuture);
+        assertSame(getFuture, service.getSchema(schemaId, 
SchemaVersion.Latest));
+        verify(underlyingService, times(1))
+            .getSchema(eq(schemaId), same(SchemaVersion.Latest));
+    }
+
+    @Test
+    public void testDeleteSchema() {
+        String schemaId = "test-schema-id";
+        String user = "test-user";
+        CompletableFuture<SchemaVersion> deleteFuture = new 
CompletableFuture<>();
+        when(underlyingService.deleteSchema(eq(schemaId), eq(user)))
+            .thenReturn(deleteFuture);
+        assertSame(deleteFuture, service.deleteSchema(schemaId, user));
+        verify(underlyingService, times(1))
+            .deleteSchema(eq(schemaId), eq(user));
+    }
+
+    @Test
+    public void testIsCompatibleWithGoodSchemaData() {
+        String schemaId = "test-schema-id";
+        SchemaCompatibilityStrategy strategy = 
SchemaCompatibilityStrategy.FULL;
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        when(underlyingService.isCompatible(eq(schemaId), 
any(SchemaData.class), eq(strategy)))
+            .thenReturn(future);
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.BOOLEAN)
+            .data(new byte[0])
+            .build();
+        assertSame(future, service.isCompatible(schemaId, schemaData, 
strategy));
+        verify(underlyingService, times(1))
+            .isCompatible(eq(schemaId), same(schemaData), eq(strategy));
+    }
+
+    @Test
+    public void testIsCompatibleWithBadSchemaData() {
+        String schemaId = "test-schema-id";
+        SchemaCompatibilityStrategy strategy = 
SchemaCompatibilityStrategy.FULL;
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        when(underlyingService.isCompatible(eq(schemaId), 
any(SchemaData.class), eq(strategy)))
+            .thenReturn(future);
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.BOOLEAN)
+            .data(new byte[10])
+            .build();
+        try {
+            service.isCompatible(schemaId, schemaData, strategy).get();
+            fail("Should fail isCompatible check");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+        }
+        verify(underlyingService, times(0))
+            .isCompatible(eq(schemaId), same(schemaData), eq(strategy));
+    }
+
+    @Test
+    public void testPutSchemaIfAbsentWithGoodSchemaData() {
+        String schemaId = "test-schema-id";
+        SchemaCompatibilityStrategy strategy = 
SchemaCompatibilityStrategy.FULL;
+        CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+        when(underlyingService.putSchemaIfAbsent(eq(schemaId), 
any(SchemaData.class), eq(strategy)))
+            .thenReturn(future);
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.BOOLEAN)
+            .data(new byte[0])
+            .build();
+        assertSame(future, service.putSchemaIfAbsent(schemaId, schemaData, 
strategy));
+        verify(underlyingService, times(1))
+            .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
+    }
+
+    @Test
+    public void testPutSchemaIfAbsentWithBadSchemaData() {
+        String schemaId = "test-schema-id";
+        SchemaCompatibilityStrategy strategy = 
SchemaCompatibilityStrategy.FULL;
+        CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+        when(underlyingService.putSchemaIfAbsent(eq(schemaId), 
any(SchemaData.class), eq(strategy)))
+            .thenReturn(future);
+        SchemaData schemaData = SchemaData.builder()
+            .type(SchemaType.BOOLEAN)
+            .data(new byte[10])
+            .build();
+        try {
+            service.putSchemaIfAbsent(schemaId, schemaData, strategy).get();
+            fail("Should fail putSchemaIfAbsent");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+        }
+        verify(underlyingService, times(0))
+            .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 9aa3573..1631850 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.base.MoreObjects;
@@ -37,6 +38,7 @@ import lombok.Cleanup;
 
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -156,62 +158,32 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test
-    public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
+    public void testWrongCorruptedSchema() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
         byte[] randomSchemaBytes = "hello".getBytes();
 
-        pulsar.getSchemaRegistryService()
-            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
-                SchemaData.builder()
-                    .type(SchemaType.JSON)
-                    .isDeleted(false)
-                    .timestamp(Clock.systemUTC().millis())
-                    .user("me")
-                    .data(randomSchemaBytes)
-                    .props(Collections.emptyMap())
-                    .build(),
-                SchemaCompatibilityStrategy.FULL
-            ).get();
-
-        Consumer<JsonEncodedPojo> consumer = pulsarClient
-            
.newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
-            .topic("persistent://my-property/use/my-ns/my-topic1")
-            .subscriptionName("my-subscriber-name")
-            .subscribe();
-
-        log.info("-- Exiting {} test --", methodName);
-    }
-
-    @Test
-    public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        byte[] randomSchemaBytes = "hello".getBytes();
-
-        pulsar.getSchemaRegistryService()
-            .putSchemaIfAbsent("my-property/my-ns/my-topic1",
-                SchemaData.builder()
-                    .type(SchemaType.JSON)
-                    .isDeleted(false)
-                    .timestamp(Clock.systemUTC().millis())
-                    .user("me")
-                    .data(randomSchemaBytes)
-                    .props(Collections.emptyMap())
-                    .build(),
-                SchemaCompatibilityStrategy.FULL
-            ).get();
-
-        Producer<JsonEncodedPojo> producer = pulsarClient
-            
.newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
-            .topic("persistent://my-property/use/my-ns/my-topic1")
-            .create();
-
+        try {
+            pulsar.getSchemaRegistryService()
+                .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+                    SchemaData.builder()
+                        .type(SchemaType.JSON)
+                        .isDeleted(false)
+                        .timestamp(Clock.systemUTC().millis())
+                        .user("me")
+                        .data(randomSchemaBytes)
+                        .props(Collections.emptyMap())
+                        .build(),
+                    SchemaCompatibilityStrategy.FULL
+                ).get();
+            fail("Should fail to add corrupted schema data");
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+        }
 
         log.info("-- Exiting {} test --", methodName);
     }
 
-
     @Test
     public void testProtobufProducerAndConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index abc8d16..9be3c63 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -54,6 +54,15 @@ public interface Schemas {
     void deleteSchema(String topic) throws PulsarAdminException;
 
     /**
+     * Create a schema for a given <tt>topic</tt> with the provided schema 
info.
+     *
+     * @param topic topic name, in fully qualified fomrat
+     * @param schemaInfo schema info
+     * @throws PulsarAdminException
+     */
+    void createSchema(String topic, SchemaInfo schemaInfo) throws 
PulsarAdminException;
+
+    /**
      * Create a schema for a given <tt>topic</tt>.
      *
      * @param topic topic name, in fully qualified format
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 9d06676..cce5cd2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -44,12 +46,7 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
         try {
             TopicName tn = TopicName.get(topic);
             GetSchemaResponse response = 
request(schemaPath(tn)).get(GetSchemaResponse.class);
-            SchemaInfo info = new SchemaInfo();
-            info.setSchema(response.getData().getBytes());
-            info.setType(response.getType());
-            info.setProperties(response.getProperties());
-            info.setName(tn.getLocalName());
-            return info;
+            return convertGetSchemaResponseToSchemaInfo(tn, response);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -59,13 +56,9 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
     public SchemaInfo getSchemaInfo(String topic, long version) throws 
PulsarAdminException {
         try {
             TopicName tn = TopicName.get(topic);
-            GetSchemaResponse response = 
request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
-            SchemaInfo info = new SchemaInfo();
-            info.setSchema(response.getData().getBytes());
-            info.setType(response.getType());
-            info.setProperties(response.getProperties());
-            info.setName(tn.getLocalName());
-            return info;
+            GetSchemaResponse response = 
request(schemaPath(tn).path(Long.toString(version)))
+                .get(GetSchemaResponse.class);
+            return convertGetSchemaResponseToSchemaInfo(tn, response);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -82,6 +75,18 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
     }
 
     @Override
+    public void createSchema(String topic, SchemaInfo schemaInfo) throws 
PulsarAdminException {
+        PostSchemaPayload payload = new PostSchemaPayload();
+        payload.setType(schemaInfo.getType().name());
+        payload.setProperties(schemaInfo.getProperties());
+        // for backward compatibility concern, we convert `bytes` to `string`
+        // we can consider fixing it in a new version of rest endpoint
+        
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
+
+        createSchema(topic, payload);
+    }
+
+    @Override
     public void createSchema(String topic, PostSchemaPayload payload) throws 
PulsarAdminException {
         try {
             TopicName tn = TopicName.get(topic);
@@ -99,4 +104,25 @@ public class SchemasImpl extends BaseResource implements 
Schemas {
             .path(topicName.getEncodedLocalName())
             .path("schema");
     }
+
+    // the util function converts `GetSchemaResponse` to `SchemaInfo`
+    static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
+                                                           GetSchemaResponse 
response) {
+        SchemaInfo info = new SchemaInfo();
+        info.setSchema(response.getData().getBytes(UTF_8));
+        info.setType(response.getType());
+        info.setProperties(response.getProperties());
+        info.setName(tn.getLocalName());
+        return info;
+    }
+
+
+    // the util function exists for backward compatibility concern
+    static String convertSchemaDataToStringLegacy(byte[] schemaData) {
+        if (null == schemaData) {
+            return "";
+        }
+
+        return new String(schemaData, UTF_8);
+    }
 }

Reply via email to