This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new e8fd852 supplement avro compatibility checker
new 7a91cf5 Merge pull request #32 from MatrixHB/optimize_compati
e8fd852 is described below
commit e8fd852dbce5dc1af96f2587928a79c89ef8d426
Author: huitong <[email protected]>
AuthorDate: Thu Aug 11 15:59:48 2022 +0800
supplement avro compatibility checker
---
README.md | 2 +-
.../schema/registry/common/QualifiedName.java | 23 ++++---
.../schema/registry/common/model/SchemaInfo.java | 11 ++++
.../schema/registry/common/utils/CommonUtil.java | 47 +++-----------
.../core/compatibility/AvroSchemaValidator.java | 75 ++++++++++++++++++++++
.../core/compatibility/CompatibilityChecker.java | 35 ++++++++++
.../core/compatibility/SchemaValidator.java | 25 ++++++++
.../registry/core/service/SchemaServiceImpl.java | 12 ++--
.../registry/storage/rocketmq/RocketmqClient.java | 3 +-
9 files changed, 176 insertions(+), 57 deletions(-)
diff --git a/README.md b/README.md
index 33219df..1c0790e 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@ RocketMQ Schema Registry
RocketMQ Schema Registry is a Topic Schema's management center. It provides a
RESTful interface for register,
delete, update, get and reference schema to subject(RocketMQ Topic). By
associating Schema with subject, the New
-RocketMQ client can send a structured date directly. User no longer need to
care about the details of serialization
+RocketMQ client can send structured data directly. User no longer need to care
about the details of serialization
and deserialization.
Schema struct can change, it will generate a new version number with each
update. During schema evolution, the
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index f9d48a0..87cacee 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -80,18 +80,17 @@ public class QualifiedName implements Serializable {
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("{");
- sb.append("\"cluster\":\"")
- .append(cluster).append('\"');
- sb.append("\"tenant\":\"")
- .append(tenant).append('\"');
- sb.append(",\"subject\":\"")
- .append(subject).append('\"');
- sb.append(",\"name\":\"")
- .append(schema).append('\"');
- sb.append(",\"version\":\"")
- .append(version).append('\"');
- sb.append('}');
+ final StringBuilder sb = new StringBuilder("(");
+ sb.append("cluster=")
+ .append(cluster).append(", ");
+ sb.append("tenant=")
+ .append(tenant).append(", ");
+ sb.append("subject=")
+ .append(subject).append(", ");
+ sb.append("name=")
+ .append(schema).append(", ");
+ sb.append("version=")
+ .append(version).append(")");
return sb.toString();
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
index 6dbf86a..92477b1 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
@@ -17,9 +17,13 @@
package org.apache.rocketmq.schema.registry.common.model;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -103,4 +107,11 @@ public class SchemaInfo extends BaseInfo {
getAudit().setLastModifiedTime(date);
}
}
+
+ public List<String> getAllRecordIdlInOrder() {
+ List<String> recordIdlList = getDetails().getSchemaRecords()
+
.stream().map(SchemaRecordInfo::getIdl).collect(Collectors.toList());
+ Collections.reverse(recordIdlList);
+ return recordIdlList;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index f1b9c70..48323f9 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -46,16 +46,10 @@ import javax.tools.JavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaValidationException;
-import org.apache.avro.SchemaValidator;
-import org.apache.avro.SchemaValidatorBuilder;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
import
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
-import org.apache.rocketmq.schema.registry.common.model.Compatibility;
-import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
@Slf4j
public class CommonUtil {
@@ -214,21 +208,17 @@ public class CommonUtil {
} catch (Throwable e) {
log.error("Read from file error", e);
} finally {
- if (br != null) {
- try {
- br.close();
+ try {
+ br.close();
- } catch (IOException ioException) {
- log.error("", ioException);
- }
+ } catch (IOException ioException) {
+ log.error("", ioException);
}
- if (inputStreamReader != null) {
- try {
- inputStreamReader.close();
+ try {
+ inputStreamReader.close();
- } catch (IOException ioException) {
- log.error("", ioException);
- }
+ } catch (IOException ioException) {
+ log.error("", ioException);
}
}
}
@@ -240,26 +230,9 @@ public class CommonUtil {
break;
case JSON:
throw new SchemaCompatibilityException("Unsupported schema
type: " + schemaDto.getMeta().getType());
- }
- }
-
- public static void validateCompatibility(SchemaInfo update, SchemaInfo
current,
- Compatibility expectCompatibility) {
- switch (update.getMeta().getType()) {
- case AVRO:
- SchemaValidator validator = new
SchemaValidatorBuilder().canReadStrategy().validateLatest();
- try {
- Schema toValidate = new
Schema.Parser().parse(update.getLastRecordIdl());
- List<Schema> existing = new ArrayList<>();
- existing.add(new
Schema.Parser().parse(current.getLastRecordIdl()));
- validator.validate(toValidate, existing);
- } catch (SchemaValidationException e) {
- throw new SchemaCompatibilityException("Schema
compatibility validation failed", e);
- }
- break;
default:
- throw new SchemaCompatibilityException("Unsupported schema
type: " + update.getMeta().getType());
+ throw new SchemaCompatibilityException("Unexpected value: " +
schemaDto.getMeta().getType());
}
-
}
+
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
new file mode 100644
index 0000000..ad38c3e
--- /dev/null
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidator;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidatorBuilder;
+import
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.BACKWARD;
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.BACKWARD_TRANSITIVE;
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.FORWARD;
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.FORWARD_TRANSITIVE;
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.FULL;
+import static
org.apache.rocketmq.schema.registry.common.model.Compatibility.FULL_TRANSITIVE;
+
+public class AvroSchemaValidator implements
org.apache.rocketmq.schema.registry.core.compatibility.SchemaValidator {
+
+ private static final Map<Compatibility, SchemaValidator>
SCHEMA_VALIDATOR_CACHE = new HashMap<>(6);
+
+ public AvroSchemaValidator() {
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(BACKWARD,
+ new SchemaValidatorBuilder().canReadStrategy().validateLatest());
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(BACKWARD_TRANSITIVE,
+ new SchemaValidatorBuilder().canReadStrategy().validateAll());
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(FORWARD,
+ new SchemaValidatorBuilder().canBeReadStrategy().validateLatest());
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(FORWARD_TRANSITIVE,
+ new SchemaValidatorBuilder().canBeReadStrategy().validateAll());
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(FULL,
+ new
SchemaValidatorBuilder().mutualReadStrategy().validateLatest());
+ SCHEMA_VALIDATOR_CACHE.putIfAbsent(FULL_TRANSITIVE,
+ new SchemaValidatorBuilder().mutualReadStrategy().validateAll());
+ }
+
+ @Override
+ public void validate(SchemaInfo update, SchemaInfo current) {
+ org.apache.avro.SchemaValidator validator =
+ SCHEMA_VALIDATOR_CACHE.get(current.getMeta().getCompatibility());
+ try {
+ Schema toValidate = new
Schema.Parser().parse(update.getLastRecordIdl());
+ List<Schema> existing = new ArrayList<>();
+ for (String schemaIdl : current.getAllRecordIdlInOrder()) {
+ existing.add(new Schema.Parser().parse(schemaIdl));
+ }
+ validator.validate(toValidate, existing);
+ } catch (SchemaValidationException e) {
+ throw new SchemaCompatibilityException("Schema compatibility
validation failed", e);
+ }
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
new file mode 100644
index 0000000..20062d3
--- /dev/null
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+
+public class CompatibilityChecker {
+
+ private static final SchemaValidator SCHEMA_VALIDATOR_AVRO = new
AvroSchemaValidator();
+
+ public static SchemaValidator getValidator(SchemaType schemaType) {
+ switch (schemaType) {
+ case AVRO:
+ return SCHEMA_VALIDATOR_AVRO;
+ default:
+ throw new SchemaCompatibilityException("Unsupported schema
type: " + schemaType);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
new file mode 100644
index 0000000..d3b25d9
--- /dev/null
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+public interface SchemaValidator {
+
+ void validate(SchemaInfo update, SchemaInfo current);
+}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 2c2dca5..7f80549 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -50,6 +50,7 @@ import
org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import
org.apache.rocketmq.schema.registry.core.compatibility.CompatibilityChecker;
import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
@Slf4j
@@ -139,7 +140,7 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
SchemaInfo current = storageServiceProxy.get(qualifiedName);
if (current == null) {
- throw new SchemaNotFoundException("Schema " +
qualifiedName.fullName() + " not exist, ignored update.");
+ throw new SchemaNotFoundException("Schema " +
qualifiedName.toString() + " not exist, ignored update.");
}
final SchemaRecordInfo updateRecord = new SchemaRecordInfo();
@@ -177,7 +178,8 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
update.getAudit().updateBy(updateSchemaRequest.getOwner(),
updateSchemaRequest.getDesc());
}
- CommonUtil.validateCompatibility(update, current,
current.getMeta().getCompatibility());
+ // check compatibility
+
CompatibilityChecker.getValidator(update.getMeta().getType()).validate(update,
current);
if (config.isUploadEnabled()) {
Dependency dependency = dependencyService.compile(update);
@@ -201,7 +203,7 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
SchemaRecordInfo current =
storageServiceProxy.getBySubject(qualifiedName);
if (current == null) {
- throw new SchemaNotFoundException("Schema " +
qualifiedName.fullName() + " not exist, ignored update.");
+ throw new SchemaNotFoundException("Schema " +
qualifiedName.toString() + " not exist, ignored update.");
}
log.info("delete schema {}", qualifiedName);
@@ -244,7 +246,7 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
SchemaRecordInfo recordInfo =
storageServiceProxy.getBySubject(qualifiedName);
if (recordInfo == null) {
- throw new SchemaException("Subject: " + qualifiedName.fullName() +
" not exist");
+ throw new SchemaException("Schema: " + qualifiedName.toString() +
" not exist");
}
log.info("get schema by subject: {}", qualifiedName.getSubject());
@@ -260,7 +262,7 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
List<SchemaRecordInfo> recordInfos =
storageServiceProxy.listBySubject(qualifiedName);
if (recordInfos == null) {
- throw new SchemaException("Subject: " + qualifiedName.fullName() +
" not exist");
+ throw new SchemaException("Schema: " + qualifiedName.toString() +
" not exist");
}
log.info("list schema by subject: {}", qualifiedName.getSubject());
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index e13f2af..063816d 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -51,7 +51,6 @@ import
org.apache.rocketmq.schema.registry.common.exception.SchemaExistException
import
org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
-import org.apache.rocketmq.schema.registry.common.model.SchemaDetailInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
@@ -251,6 +250,7 @@ public class RocketmqClient {
boolean isSchemaDeleted =
Boolean.parseBoolean(msg.getUserProperty(DELETE_KEYS));
if (isSchemaDeleted) {
// delete
+ log.info("receive delete schema msg, schema = {}",
update);
deleteAllSubject(update);
cache.delete(schemaCfHandle(), schemaFullName);
}
@@ -333,7 +333,6 @@ public class RocketmqClient {
try {
synchronized (this) {
schemaInfo.setLastModifiedTime(new Date());
- schemaInfo.setDetails(new SchemaDetailInfo());
Message msg = new Message(storageTopic, "",
schemaInfo.schemaFullName(), converter.toJsonAsBytes(schemaInfo));
msg.putUserProperty(DELETE_KEYS, "true");
SendResult result = sendOrderMessageToRocketmq(msg);