This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c13f18 [schema] Introduce schema data validator (#4360)
6c13f18 is described below
commit 6c13f189423892b1c670e2a4791f16895ea23c4a
Author: Sijie Guo <[email protected]>
AuthorDate: Thu Jun 20 19:46:17 2019 -0700
[schema] Introduce schema data validator (#4360)
*Motivation*
Currently the schema data is only validated in compatibility checker. If
the schema data is uploaded from admin api, there is no validation if the
schema is the first version.
*Changes*
Add schema data validator to validate the schema data before storing it in
schema storage.
---
.../pulsar/broker/admin/v2/SchemasResource.java | 88 ++++++-----
.../broker/service/BrokerServiceException.java | 22 ++-
.../org/apache/pulsar/broker/service/Consumer.java | 2 +-
.../apache/pulsar/broker/service/ServerCnx.java | 8 +-
.../schema/KeyValueSchemaCompatibilityCheck.java | 2 +-
.../service/schema/SchemaRegistryService.java | 4 +-
.../service/schema/SchemaRegistryServiceImpl.java | 1 +
.../IncompatibleSchemaException.java | 10 +-
.../InvalidSchemaDataException.java} | 18 ++-
.../SchemaException.java} | 20 ++-
.../validator/PrimitiveSchemaDataValidator.java | 45 ++++++
.../schema/validator/SchemaDataValidator.java | 87 +++++++++++
...hemaRegistryServiceWithSchemaDataValidator.java | 98 +++++++++++++
.../validator/StringSchemaDataValidator.java | 53 +++++++
.../validator/StructSchemaDataValidator.java | 77 ++++++++++
.../schema/validator/SchemaDataValidatorTest.java | 133 +++++++++++++++++
...RegistryServiceWithSchemaDataValidatorTest.java | 163 +++++++++++++++++++++
.../api/SimpleTypedProducerConsumerTest.java | 68 +++------
.../org/apache/pulsar/client/admin/Schemas.java | 9 ++
.../pulsar/client/admin/internal/SchemasImpl.java | 52 +++++--
20 files changed, 828 insertions(+), 132 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
index eb7cc64..1e8c8e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.v2;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
import static org.apache.pulsar.common.util.Codec.decode;
@@ -48,9 +49,11 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -85,7 +88,7 @@ public class SchemasResource extends AdminResource {
this.clock = clock;
}
- private long getLongSchemaVersion(SchemaVersion schemaVersion) {
+ private static long getLongSchemaVersion(SchemaVersion schemaVersion) {
if (schemaVersion instanceof LongSchemaVersion) {
return ((LongSchemaVersion) schemaVersion).getVersion();
} else {
@@ -116,29 +119,7 @@ public class SchemasResource extends AdminResource {
String schemaId = buildSchemaId(tenant, namespace, topic);
pulsar().getSchemaRegistryService().getSchema(schemaId)
.handle((schema, error) -> {
- if (isNull(error)) {
- if (isNull(schema)) {
-
response.resume(Response.status(Response.Status.NOT_FOUND).build());
- } else if (schema.schema.isDeleted()) {
-
response.resume(Response.status(Response.Status.NOT_FOUND).build());
- } else {
- response.resume(
- Response.ok()
- .encoding(MediaType.APPLICATION_JSON)
- .entity(GetSchemaResponse.builder()
-
.version(getLongSchemaVersion(schema.version))
- .type(schema.schema.getType())
- .timestamp(schema.schema.getTimestamp())
- .data(new String(schema.schema.getData()))
- .properties(schema.schema.getProps())
- .build()
- )
- .build()
- );
- }
- } else {
- response.resume(error);
- }
+ handleGetSchemaResponse(response, schema, error);
return null;
});
}
@@ -170,32 +151,38 @@ public class SchemasResource extends AdminResource {
SchemaVersion v =
pulsar().getSchemaRegistryService().versionFromBytes(bbVersion.array());
pulsar().getSchemaRegistryService().getSchema(schemaId, v)
.handle((schema, error) -> {
- if (isNull(error)) {
- if (isNull(schema)) {
-
response.resume(Response.status(Response.Status.NOT_FOUND).build());
- } else if (schema.schema.isDeleted()) {
-
response.resume(Response.status(Response.Status.NOT_FOUND).build());
- } else {
- response.resume(
- Response.ok()
- .encoding(MediaType.APPLICATION_JSON)
- .entity(GetSchemaResponse.builder()
-
.version(getLongSchemaVersion(schema.version))
- .type(schema.schema.getType())
- .timestamp(schema.schema.getTimestamp())
- .data(new String(schema.schema.getData()))
- .properties(schema.schema.getProps())
- .build()
- ).build()
- );
- }
- } else {
- response.resume(error);
- }
+ handleGetSchemaResponse(response, schema, error);
return null;
});
}
+ private static void handleGetSchemaResponse(AsyncResponse response,
+ SchemaAndMetadata schema,
Throwable error) {
+ if (isNull(error)) {
+ if (isNull(schema)) {
+
response.resume(Response.status(Response.Status.NOT_FOUND).build());
+ } else if (schema.schema.isDeleted()) {
+
response.resume(Response.status(Response.Status.NOT_FOUND).build());
+ } else {
+ response.resume(
+ Response.ok()
+ .encoding(MediaType.APPLICATION_JSON)
+ .entity(GetSchemaResponse.builder()
+ .version(getLongSchemaVersion(schema.version))
+ .type(schema.schema.getType())
+ .timestamp(schema.schema.getTimestamp())
+ .data(new String(schema.schema.getData(), UTF_8))
+ .properties(schema.schema.getProps())
+ .build()
+ ).build()
+ );
+ }
+ } else {
+ response.resume(error);
+ }
+
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/schema")
@Produces(MediaType.APPLICATION_JSON)
@@ -244,7 +231,9 @@ public class SchemasResource extends AdminResource {
@ApiResponse(code = 401, message = "Client is not authorized or Don't
have admin permission"),
@ApiResponse(code = 403, message = "Client is not authenticated"),
@ApiResponse(code = 404, message = "Tenant or Namespace or Topic
doesn't exist"),
+ @ApiResponse(code = 409, message = "Incompatible schema"),
@ApiResponse(code = 412, message = "Failed to find the ownership for
the topic"),
+ @ApiResponse(code = 422, message = "Invalid schema data"),
})
public void postSchema(
@PathParam("tenant") String tenant,
@@ -287,6 +276,11 @@ public class SchemasResource extends AdminResource {
).exceptionally(error -> {
if (error instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT).build());
+ } else if (error instanceof InvalidSchemaDataException) {
+ response.resume(Response.status(
+ 422, /* Unprocessable Entity */
+ error.getMessage()
+ ).build());
} else {
response.resume(
Response.serverError().build()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index c3f6909..2e28700 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -18,7 +18,8 @@
*/
package org.apache.pulsar.broker.service;
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
@@ -37,6 +38,10 @@ public class BrokerServiceException extends Exception {
super(t);
}
+ public BrokerServiceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
public static class ConsumerBusyException extends BrokerServiceException {
public ConsumerBusyException(String msg) {
super(msg);
@@ -154,6 +159,10 @@ public class BrokerServiceException extends Exception {
}
public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
+ return getClientErrorCode(t, true);
+ }
+
+ private static PulsarApi.ServerError getClientErrorCode(Throwable t,
boolean checkCauseIfUnknown) {
if (t instanceof ServerMetadataException) {
return PulsarApi.ServerError.MetadataError;
} else if (t instanceof NamingException) {
@@ -171,12 +180,19 @@ public class BrokerServiceException extends Exception {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof
TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
- } else if (t instanceof IncompatibleSchemaException) {
+ } else if (t instanceof IncompatibleSchemaException
+ || t instanceof InvalidSchemaDataException) {
+ // for backward compatible with old clients, invalid schema data
+ // is treated as "incompatible schema".
return PulsarApi.ServerError.IncompatibleSchema;
} else if (t instanceof ConsumerAssignException) {
return ServerError.ConsumerAssignError;
} else {
- return PulsarApi.ServerError.UnknownError;
+ if (checkCauseIfUnknown) {
+ return getClientErrorCode(t.getCause(), false);
+ } else {
+ return PulsarApi.ServerError.UnknownError;
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 9276892..678347a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -312,7 +312,7 @@ public class Consumer {
}).exceptionally(exception -> {
log.warn("Unsubscribe failed for {}", subscription, exception);
ctx.writeAndFlush(
- Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
+ Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage()));
return null;
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a29c9a4..d8de814 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -58,7 +58,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationState;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import org.apache.pulsar.broker.service.schema.IncompatibleSchemaException;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -738,7 +738,7 @@ public class ServerCnx extends PulsarHandler {
// back to client, only if not completed
already.
if
(consumerFuture.completeExceptionally(exception)) {
ctx.writeAndFlush(Commands.newError(requestId,
-
BrokerServiceException.getClientErrorCode(exception.getCause()),
+
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage()));
}
consumers.remove(consumerId,
consumerFuture);
@@ -927,7 +927,7 @@ public class ServerCnx extends PulsarHandler {
schemaVersionFuture.exceptionally(exception -> {
ctx.writeAndFlush(Commands.newError(requestId,
-
BrokerServiceException.getClientErrorCode(exception.getCause()),
+
BrokerServiceException.getClientErrorCode(exception),
exception.getMessage()));
producers.remove(producerId, producerFuture);
return null;
@@ -1455,7 +1455,7 @@ public class ServerCnx extends PulsarHandler {
future.getNow(null);
} catch (Exception e) {
if (e.getCause() instanceof BrokerServiceException) {
- error =
BrokerServiceException.getClientErrorCode((BrokerServiceException)
e.getCause());
+ error =
BrokerServiceException.getClientErrorCode(e.getCause());
}
}
return error;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
index 2426f3e..c5fa364 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -39,7 +39,7 @@ public class KeyValueSchemaCompatibilityCheck implements
SchemaCompatibilityChec
this.checkers = checkers;
}
- private KeyValue<SchemaData, SchemaData>
decodeKeyValueSchemaData(SchemaData schemaData) {
+ public static KeyValue<SchemaData, SchemaData>
decodeKeyValueSchemaData(SchemaData schemaData) {
KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo());
return new KeyValue<>(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index baa57a4..73244cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,8 @@ public interface SchemaRegistryService extends SchemaRegistry
{
schemaStorage.start();
- return new SchemaRegistryServiceImpl(schemaStorage, checkers);
+ return SchemaRegistryServiceWithSchemaDataValidator.of(
+ new SchemaRegistryServiceImpl(schemaStorage, checkers));
} catch (Exception e) {
log.warn("Unable to create schema registry storage, defaulting to
empty storage: {}", e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5c38c8a..57246f0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -40,6 +40,7 @@ import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
similarity index 78%
copy from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
index 975ba0d..e045deb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java
@@ -16,9 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
+
+/**
+ * Exception is thrown when an incompatible schema is used.
+ */
+public class IncompatibleSchemaException extends SchemaException {
+
+ private static final long serialVersionUID = -6013970359956508359L;
-public class IncompatibleSchemaException extends Exception {
public IncompatibleSchemaException() {
super("Incompatible schema used");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
similarity index 66%
copy from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
index 975ba0d..a6a5808 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/InvalidSchemaDataException.java
@@ -16,14 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
-public class IncompatibleSchemaException extends Exception {
- public IncompatibleSchemaException() {
- super("Incompatible schema used");
- }
+/**
+ * Exception thrown when the schema data is not in a valid form.
+ */
+public class InvalidSchemaDataException extends SchemaException {
- public IncompatibleSchemaException(String message) {
+ private static final long serialVersionUID = -2846364736743783766L;
+
+ public InvalidSchemaDataException(String message) {
super(message);
}
+
+ public InvalidSchemaDataException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
similarity index 66%
rename from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
rename to
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
index 975ba0d..54a22e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/IncompatibleSchemaException.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/SchemaException.java
@@ -16,14 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service.schema;
+package org.apache.pulsar.broker.service.schema.exceptions;
-public class IncompatibleSchemaException extends Exception {
- public IncompatibleSchemaException() {
- super("Incompatible schema used");
- }
+import org.apache.pulsar.broker.service.BrokerServiceException;
+
+/**
+ * Schema related exceptions.
+ */
+public class SchemaException extends BrokerServiceException {
- public IncompatibleSchemaException(String message) {
+ private static final long serialVersionUID = -6587520779026691815L;
+
+ public SchemaException(String message) {
super(message);
}
+
+ public SchemaException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
new file mode 100644
index 0000000..346a6d0
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/PrimitiveSchemaDataValidator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+
+/**
+ * Validate if the primitive schema is in expected form.
+ */
+class PrimitiveSchemaDataValidator implements SchemaDataValidator {
+
+ public static PrimitiveSchemaDataValidator of() {
+ return INSTANCE;
+ }
+
+ private static final PrimitiveSchemaDataValidator INSTANCE = new
PrimitiveSchemaDataValidator();
+
+ private PrimitiveSchemaDataValidator() {}
+
+ @Override
+ public void validate(SchemaData schemaData) throws
InvalidSchemaDataException {
+ byte[] data = schemaData.getData();
+ if (null != data && data.length > 0) {
+ throw new InvalidSchemaDataException("Invalid schema definition
data for primitive schemas :"
+ + "length of schema data should be zero, but " + data.length +
" bytes is found");
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
new file mode 100644
index 0000000..0c44612
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import
org.apache.pulsar.broker.service.schema.KeyValueSchemaCompatibilityCheck;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.KeyValue;
+
+/**
+ * A validator to validate the schema data is well formed.
+ */
+public interface SchemaDataValidator {
+
+ /**
+ * Validate if the schema data is well formed.
+ *
+ * @param schemaData schema data to validate
+ * @throws InvalidSchemaDataException if the schema data is not in a valid
form.
+ */
+ static void validateSchemaData(SchemaData schemaData) throws
InvalidSchemaDataException {
+ switch (schemaData.getType()) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ StructSchemaDataValidator.of().validate(schemaData);
+ break;
+ case STRING:
+ StringSchemaDataValidator.of().validate(schemaData);
+ break;
+ case BOOLEAN:
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ PrimitiveSchemaDataValidator.of().validate(schemaData);
+ break;
+ case NONE:
+ case BYTES:
+ // `NONE` and `BYTES` schema is not stored
+ break;
+ case AUTO:
+ case AUTO_CONSUME:
+ case AUTO_PUBLISH:
+ throw new InvalidSchemaDataException(
+ "Schema " + schemaData.getType() + " is a client-side
schema type");
+ case KEY_VALUE:
+ KeyValue<SchemaData, SchemaData> kvSchema =
+
KeyValueSchemaCompatibilityCheck.decodeKeyValueSchemaData(schemaData);
+ validateSchemaData(kvSchema.getKey());
+ validateSchemaData(kvSchema.getValue());
+ break;
+ default:
+ throw new InvalidSchemaDataException("Unknown schema type : "
+ schemaData.getType());
+ }
+ }
+
+ /**
+ * Validate a schema data is in a valid form.
+ *
+ * @param schemaData schema data to validate
+ * @throws InvalidSchemaDataException if the schema data is not in a valid
form.
+ */
+ void validate(SchemaData schemaData) throws InvalidSchemaDataException;
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
new file mode 100644
index 0000000..f0ca3fc
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * A {@link SchemaRegistryService} wrapper that validate schema data.
+ */
+public class SchemaRegistryServiceWithSchemaDataValidator implements
SchemaRegistryService {
+
+ public static SchemaRegistryServiceWithSchemaDataValidator
of(SchemaRegistryService service) {
+ return new SchemaRegistryServiceWithSchemaDataValidator(service);
+ }
+
+ private final SchemaRegistryService service;
+
+ private SchemaRegistryServiceWithSchemaDataValidator(SchemaRegistryService
service) {
+ this.service = service;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.service.close();
+ }
+
+ @Override
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+ return this.service.getSchema(schemaId);
+ }
+
+ @Override
+ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId,
SchemaVersion version) {
+ return this.service.getSchema(schemaId, version);
+ }
+
+ @Override
+ public CompletableFuture<List<CompletableFuture<SchemaAndMetadata>>>
getAllSchemas(String schemaId) {
+ return this.service.getAllSchemas(schemaId);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId,
+ SchemaData
schema,
+
SchemaCompatibilityStrategy strategy) {
+ try {
+ SchemaDataValidator.validateSchemaData(schema);
+ } catch (InvalidSchemaDataException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ return service.putSchemaIfAbsent(schemaId, schema, strategy);
+ }
+
+ @Override
+ public CompletableFuture<SchemaVersion> deleteSchema(String schemaId,
String user) {
+ return service.deleteSchema(schemaId, user);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isCompatible(String schemaId,
+ SchemaData schema,
+ SchemaCompatibilityStrategy
strategy) {
+ try {
+ SchemaDataValidator.validateSchemaData(schema);
+ } catch (InvalidSchemaDataException e) {
+ return FutureUtil.failedFuture(e);
+ }
+ return service.isCompatible(schemaId, schema, strategy);
+ }
+
+ @Override
+ public SchemaVersion versionFromBytes(byte[] version) {
+ return service.versionFromBytes(version);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
new file mode 100644
index 0000000..4368b14
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StringSchemaDataValidator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+
+/**
+ * Validate if the string schema is in expected form.
+ */
+class StringSchemaDataValidator implements SchemaDataValidator {
+
+ public static final StringSchemaDataValidator of() {
+ return INSTANCE;
+ }
+
+ private static final StringSchemaDataValidator INSTANCE = new
StringSchemaDataValidator();
+
+ private static final String PY_NONE_SCHEMA_INFO = "null";
+
+ private StringSchemaDataValidator() {}
+
+ @Override
+ public void validate(SchemaData schemaData) throws
InvalidSchemaDataException {
+ byte[] data = schemaData.getData();
+ if (null != data && data.length > 0) {
+ // python send 'null' string as schema data
+ String schemaDataStr = new String(data, UTF_8);
+ if (!PY_NONE_SCHEMA_INFO.equals(schemaDataStr)) {
+ throw new InvalidSchemaDataException("Invalid schema
definition data for string schema : '"
+ + schemaDataStr + "'");
+ }
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
new file mode 100644
index 0000000..1eeed83
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+/**
+ * Validate if the struct schema is in expected form.
+ */
+class StructSchemaDataValidator implements SchemaDataValidator {
+
+ public static StructSchemaDataValidator of() {
+ return INSTANCE;
+ }
+
+ private static final StructSchemaDataValidator INSTANCE = new
StructSchemaDataValidator();
+
+ private StructSchemaDataValidator() {}
+
+ @Override
+ public void validate(SchemaData schemaData) throws
InvalidSchemaDataException {
+ byte[] data = schemaData.getData();
+
+ try {
+ Schema.Parser avroSchemaParser = new Schema.Parser();
+ avroSchemaParser.parse(new String(data, UTF_8));
+ } catch (SchemaParseException e) {
+ if (schemaData.getType() == SchemaType.JSON) {
+ // we used JsonSchema for storing the definition of a JSON
schema
+ // hence for backward compatibility consideration, we need to
try
+ // to use JsonSchema to decode the schema data
+ ObjectMapper objectMapper =
ObjectMapperFactory.getThreadLocal();
+ try {
+ objectMapper.readValue(data, JsonSchema.class);
+ } catch (IOException ioe) {
+ throwInvalidSchemaDataException(schemaData, ioe);
+ }
+ } else {
+ throwInvalidSchemaDataException(schemaData, e);
+ }
+ } catch (Exception e) {
+ throwInvalidSchemaDataException(schemaData, e);
+ }
+ }
+
+ private static void throwInvalidSchemaDataException(SchemaData schemaData,
+ Throwable cause)
throws InvalidSchemaDataException {
+ throw new InvalidSchemaDataException("Invalid schema definition data
for "
+ + schemaData.getType() + " schema", cause);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
new file mode 100644
index 0000000..998d363
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class SchemaDataValidatorTest {
+
+ private static class Foo {
+ int field;
+ }
+
+ @DataProvider(name = "primitiveSchemas")
+ public static Object[][] primitiveSchemas() {
+ return new Object[][] {
+ { SchemaType.STRING },
+ { SchemaType.BOOLEAN },
+ { SchemaType.INT8 },
+ { SchemaType.INT16 },
+ { SchemaType.INT32 },
+ { SchemaType.INT64 },
+ { SchemaType.FLOAT },
+ { SchemaType.DOUBLE },
+ { SchemaType.DATE },
+ { SchemaType.TIME },
+ { SchemaType.TIMESTAMP },
+ };
+ }
+
+ @DataProvider(name = "clientSchemas")
+ public static Object[][] clientSchemas() {
+ return new Object[][] {
+ { SchemaType.AUTO_CONSUME },
+ { SchemaType.AUTO_PUBLISH },
+ { SchemaType.AUTO },
+ };
+ }
+
+ @DataProvider(name = "structSchemas")
+ public static Object[][] structSchemas() {
+ return new Object[][] {
+ { SchemaType.AVRO },
+ { SchemaType.JSON },
+ { SchemaType.PROTOBUF },
+ };
+ }
+
+ @Test(dataProvider = "primitiveSchemas")
+ public void testPrimitiveValidatorSuccess(SchemaType type) throws
Exception {
+ SchemaData data = SchemaData.builder()
+ .type(type)
+ .data(new byte[0])
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+ @Test(dataProvider = "primitiveSchemas", expectedExceptions =
InvalidSchemaDataException.class)
+ public void testPrimitiveValidatorInvalid(SchemaType type) throws
Exception {
+ SchemaData data = SchemaData.builder()
+ .type(type)
+ .data(new byte[10])
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+ @Test(dataProvider = "clientSchemas", expectedExceptions =
InvalidSchemaDataException.class)
+ public void testValidateClientSchemas(SchemaType type) throws Exception {
+ SchemaData data = SchemaData.builder()
+ .type(type)
+ .data(new byte[0])
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+ @Test(dataProvider = "structSchemas")
+ public void testStructValidatorSuccess(SchemaType type) throws Exception {
+ Schema<Foo> schema = Schema.AVRO(Foo.class);
+ SchemaData data = SchemaData.builder()
+ .type(type)
+ .data(schema.getSchemaInfo().getSchema())
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+ @Test(dataProvider = "structSchemas", expectedExceptions =
InvalidSchemaDataException.class)
+ public void testStructValidatorInvalid(SchemaType type) throws Exception {
+ SchemaData data = SchemaData.builder()
+ .type(type)
+ .data("bad-schema".getBytes(UTF_8))
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+ @Test
+ public void testJsonSchemaTypeWithJsonSchemaData() throws Exception {
+ ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+ SchemaData data = SchemaData.builder()
+ .type(SchemaType.JSON)
+ .data(
+ mapper.writeValueAsBytes(
+ new JsonSchemaGenerator(mapper)
+ .generateSchema(Foo.class)))
+ .build();
+ SchemaDataValidator.validateSchemaData(data);
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
new file mode 100644
index 0000000..aa1cfbb
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaRegistryServiceWithSchemaDataValidatorTest.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema.validator;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
+import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link SchemaRegistryServiceWithSchemaDataValidator}.
+ */
+public class SchemaRegistryServiceWithSchemaDataValidatorTest {
+
+ private SchemaRegistryService underlyingService;
+ private SchemaRegistryServiceWithSchemaDataValidator service;
+
+ @BeforeMethod
+ public void setup() {
+ this.underlyingService = mock(SchemaRegistryService.class);
+ this.service =
SchemaRegistryServiceWithSchemaDataValidator.of(underlyingService);
+ }
+
+ @Test
+ public void testGetLatestSchema() {
+ String schemaId = "test-schema-id";
+ CompletableFuture<SchemaAndMetadata> getFuture = new
CompletableFuture<>();
+ when(underlyingService.getSchema(eq(schemaId))).thenReturn(getFuture);
+ assertSame(getFuture, service.getSchema(schemaId));
+ verify(underlyingService, times(1)).getSchema(eq(schemaId));
+ }
+
+ @Test
+ public void testGetSchemaByVersion() {
+ String schemaId = "test-schema-id";
+ CompletableFuture<SchemaAndMetadata> getFuture = new
CompletableFuture<>();
+ when(underlyingService.getSchema(eq(schemaId),
any(SchemaVersion.class)))
+ .thenReturn(getFuture);
+ assertSame(getFuture, service.getSchema(schemaId,
SchemaVersion.Latest));
+ verify(underlyingService, times(1))
+ .getSchema(eq(schemaId), same(SchemaVersion.Latest));
+ }
+
+ @Test
+ public void testDeleteSchema() {
+ String schemaId = "test-schema-id";
+ String user = "test-user";
+ CompletableFuture<SchemaVersion> deleteFuture = new
CompletableFuture<>();
+ when(underlyingService.deleteSchema(eq(schemaId), eq(user)))
+ .thenReturn(deleteFuture);
+ assertSame(deleteFuture, service.deleteSchema(schemaId, user));
+ verify(underlyingService, times(1))
+ .deleteSchema(eq(schemaId), eq(user));
+ }
+
+ @Test
+ public void testIsCompatibleWithGoodSchemaData() {
+ String schemaId = "test-schema-id";
+ SchemaCompatibilityStrategy strategy =
SchemaCompatibilityStrategy.FULL;
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ when(underlyingService.isCompatible(eq(schemaId),
any(SchemaData.class), eq(strategy)))
+ .thenReturn(future);
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.BOOLEAN)
+ .data(new byte[0])
+ .build();
+ assertSame(future, service.isCompatible(schemaId, schemaData,
strategy));
+ verify(underlyingService, times(1))
+ .isCompatible(eq(schemaId), same(schemaData), eq(strategy));
+ }
+
+ @Test
+ public void testIsCompatibleWithBadSchemaData() {
+ String schemaId = "test-schema-id";
+ SchemaCompatibilityStrategy strategy =
SchemaCompatibilityStrategy.FULL;
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ when(underlyingService.isCompatible(eq(schemaId),
any(SchemaData.class), eq(strategy)))
+ .thenReturn(future);
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.BOOLEAN)
+ .data(new byte[10])
+ .build();
+ try {
+ service.isCompatible(schemaId, schemaData, strategy).get();
+ fail("Should fail isCompatible check");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+ }
+ verify(underlyingService, times(0))
+ .isCompatible(eq(schemaId), same(schemaData), eq(strategy));
+ }
+
+ @Test
+ public void testPutSchemaIfAbsentWithGoodSchemaData() {
+ String schemaId = "test-schema-id";
+ SchemaCompatibilityStrategy strategy =
SchemaCompatibilityStrategy.FULL;
+ CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+ when(underlyingService.putSchemaIfAbsent(eq(schemaId),
any(SchemaData.class), eq(strategy)))
+ .thenReturn(future);
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.BOOLEAN)
+ .data(new byte[0])
+ .build();
+ assertSame(future, service.putSchemaIfAbsent(schemaId, schemaData,
strategy));
+ verify(underlyingService, times(1))
+ .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
+ }
+
+ @Test
+ public void testPutSchemaIfAbsentWithBadSchemaData() {
+ String schemaId = "test-schema-id";
+ SchemaCompatibilityStrategy strategy =
SchemaCompatibilityStrategy.FULL;
+ CompletableFuture<SchemaVersion> future = new CompletableFuture<>();
+ when(underlyingService.putSchemaIfAbsent(eq(schemaId),
any(SchemaData.class), eq(strategy)))
+ .thenReturn(future);
+ SchemaData schemaData = SchemaData.builder()
+ .type(SchemaType.BOOLEAN)
+ .data(new byte[10])
+ .build();
+ try {
+ service.putSchemaIfAbsent(schemaId, schemaData, strategy).get();
+ fail("Should fail putSchemaIfAbsent");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+ }
+ verify(underlyingService, times(0))
+ .putSchemaIfAbsent(eq(schemaId), same(schemaData), eq(strategy));
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 9aa3573..1631850 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.MoreObjects;
@@ -37,6 +38,7 @@ import lombok.Cleanup;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
+import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -156,62 +158,32 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
}
@Test
- public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
+ public void testWrongCorruptedSchema() throws Exception {
log.info("-- Starting {} test --", methodName);
byte[] randomSchemaBytes = "hello".getBytes();
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
- SchemaData.builder()
- .type(SchemaType.JSON)
- .isDeleted(false)
- .timestamp(Clock.systemUTC().millis())
- .user("me")
- .data(randomSchemaBytes)
- .props(Collections.emptyMap())
- .build(),
- SchemaCompatibilityStrategy.FULL
- ).get();
-
- Consumer<JsonEncodedPojo> consumer = pulsarClient
-
.newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
- .topic("persistent://my-property/use/my-ns/my-topic1")
- .subscriptionName("my-subscriber-name")
- .subscribe();
-
- log.info("-- Exiting {} test --", methodName);
- }
-
- @Test
- public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
- log.info("-- Starting {} test --", methodName);
-
- byte[] randomSchemaBytes = "hello".getBytes();
-
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
- SchemaData.builder()
- .type(SchemaType.JSON)
- .isDeleted(false)
- .timestamp(Clock.systemUTC().millis())
- .user("me")
- .data(randomSchemaBytes)
- .props(Collections.emptyMap())
- .build(),
- SchemaCompatibilityStrategy.FULL
- ).get();
-
- Producer<JsonEncodedPojo> producer = pulsarClient
-
.newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
- .topic("persistent://my-property/use/my-ns/my-topic1")
- .create();
-
+ try {
+ pulsar.getSchemaRegistryService()
+ .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+ SchemaData.builder()
+ .type(SchemaType.JSON)
+ .isDeleted(false)
+ .timestamp(Clock.systemUTC().millis())
+ .user("me")
+ .data(randomSchemaBytes)
+ .props(Collections.emptyMap())
+ .build(),
+ SchemaCompatibilityStrategy.FULL
+ ).get();
+ fail("Should fail to add corrupted schema data");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof InvalidSchemaDataException);
+ }
log.info("-- Exiting {} test --", methodName);
}
-
@Test
public void testProtobufProducerAndConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
index abc8d16..9be3c63 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Schemas.java
@@ -54,6 +54,15 @@ public interface Schemas {
void deleteSchema(String topic) throws PulsarAdminException;
/**
+ * Create a schema for a given <tt>topic</tt> with the provided schema
info.
+ *
+ * @param topic topic name, in fully qualified fomrat
+ * @param schemaInfo schema info
+ * @throws PulsarAdminException
+ */
+ void createSchema(String topic, SchemaInfo schemaInfo) throws
PulsarAdminException;
+
+ /**
* Create a schema for a given <tt>topic</tt>.
*
* @param topic topic name, in fully qualified format
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 9d06676..cce5cd2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.admin.internal;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -44,12 +46,7 @@ public class SchemasImpl extends BaseResource implements
Schemas {
try {
TopicName tn = TopicName.get(topic);
GetSchemaResponse response =
request(schemaPath(tn)).get(GetSchemaResponse.class);
- SchemaInfo info = new SchemaInfo();
- info.setSchema(response.getData().getBytes());
- info.setType(response.getType());
- info.setProperties(response.getProperties());
- info.setName(tn.getLocalName());
- return info;
+ return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
@@ -59,13 +56,9 @@ public class SchemasImpl extends BaseResource implements
Schemas {
public SchemaInfo getSchemaInfo(String topic, long version) throws
PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
- GetSchemaResponse response =
request(schemaPath(tn).path(Long.toString(version))).get(GetSchemaResponse.class);
- SchemaInfo info = new SchemaInfo();
- info.setSchema(response.getData().getBytes());
- info.setType(response.getType());
- info.setProperties(response.getProperties());
- info.setName(tn.getLocalName());
- return info;
+ GetSchemaResponse response =
request(schemaPath(tn).path(Long.toString(version)))
+ .get(GetSchemaResponse.class);
+ return convertGetSchemaResponseToSchemaInfo(tn, response);
} catch (Exception e) {
throw getApiException(e);
}
@@ -82,6 +75,18 @@ public class SchemasImpl extends BaseResource implements
Schemas {
}
@Override
+ public void createSchema(String topic, SchemaInfo schemaInfo) throws
PulsarAdminException {
+ PostSchemaPayload payload = new PostSchemaPayload();
+ payload.setType(schemaInfo.getType().name());
+ payload.setProperties(schemaInfo.getProperties());
+ // for backward compatibility concern, we convert `bytes` to `string`
+ // we can consider fixing it in a new version of rest endpoint
+
payload.setSchema(convertSchemaDataToStringLegacy(schemaInfo.getSchema()));
+
+ createSchema(topic, payload);
+ }
+
+ @Override
public void createSchema(String topic, PostSchemaPayload payload) throws
PulsarAdminException {
try {
TopicName tn = TopicName.get(topic);
@@ -99,4 +104,25 @@ public class SchemasImpl extends BaseResource implements
Schemas {
.path(topicName.getEncodedLocalName())
.path("schema");
}
+
+ // the util function converts `GetSchemaResponse` to `SchemaInfo`
+ static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
+ GetSchemaResponse
response) {
+ SchemaInfo info = new SchemaInfo();
+ info.setSchema(response.getData().getBytes(UTF_8));
+ info.setType(response.getType());
+ info.setProperties(response.getProperties());
+ info.setName(tn.getLocalName());
+ return info;
+ }
+
+
+ // the util function exists for backward compatibility concern
+ static String convertSchemaDataToStringLegacy(byte[] schemaData) {
+ if (null == schemaData) {
+ return "";
+ }
+
+ return new String(schemaData, UTF_8);
+ }
}