This is an automated email from the ASF dual-hosted git repository.
linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ffa97efaac [cleanup][broker] Remove duplicate code in the
SchemaRegistryServiceImpl that checks for existing schema and new schema types
(#19753)
0ffa97efaac is described below
commit 0ffa97efaac32f0e2b6d951966972f22f59329b5
Author: sinan liu <[email protected]>
AuthorDate: Thu Mar 16 10:01:50 2023 +0800
[cleanup][broker] Remove duplicate code in the SchemaRegistryServiceImpl
that checks for existing schema and new schema types (#19753)
---
.../service/schema/SchemaRegistryServiceImpl.java | 20 +++++++-------------
1 file changed, 7 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 4eb87564d0f..ae56df248d8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -348,14 +348,14 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData
newSchema,
SchemaCompatibilityStrategy strategy) throws
IncompatibleSchemaException {
- SchemaHash existingHash = SchemaHash.of(existingSchema.schema);
- SchemaHash newHash = SchemaHash.of(newSchema);
SchemaData existingSchemaData = existingSchema.schema;
if (newSchema.getType() != existingSchemaData.getType()) {
throw new IncompatibleSchemaException(String.format("Incompatible
schema: "
+ "exists schema type %s, new schema type %s",
existingSchemaData.getType(), newSchema.getType()));
}
+ SchemaHash existingHash = SchemaHash.of(existingSchemaData);
+ SchemaHash newHash = SchemaHash.of(newSchema);
if (!newHash.equals(existingHash)) {
compatibilityChecks.getOrDefault(newSchema.getType(),
SchemaCompatibilityCheck.DEFAULT)
.checkCompatible(existingSchemaData, newSchema, strategy);
@@ -465,17 +465,11 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
});
- if (existingSchema.schema.getType() != schema.getType()) {
- result.completeExceptionally(new
IncompatibleSchemaException(
- String.format("Incompatible schema: exists schema
type %s, new schema type %s",
- existingSchema.schema.getType(),
schema.getType())));
- } else {
- try {
- checkCompatible(existingSchema, schema, strategy);
- result.complete(null);
- } catch (IncompatibleSchemaException e) {
- result.completeExceptionally(e);
- }
+ try {
+ checkCompatible(existingSchema, schema, strategy);
+ result.complete(null);
+ } catch (IncompatibleSchemaException e) {
+ result.completeExceptionally(e);
}
return result;
} else {