This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new e8fd852  supplement avro compatibility checker
     new 7a91cf5  Merge pull request #32 from MatrixHB/optimize_compati
e8fd852 is described below

commit e8fd852dbce5dc1af96f2587928a79c89ef8d426
Author: huitong <[email protected]>
AuthorDate: Thu Aug 11 15:59:48 2022 +0800

    supplement avro compatibility checker
---
 README.md                                          |  2 +-
 .../schema/registry/common/QualifiedName.java      | 23 ++++---
 .../schema/registry/common/model/SchemaInfo.java   | 11 ++++
 .../schema/registry/common/utils/CommonUtil.java   | 47 +++-----------
 .../core/compatibility/AvroSchemaValidator.java    | 75 ++++++++++++++++++++++
 .../core/compatibility/CompatibilityChecker.java   | 35 ++++++++++
 .../core/compatibility/SchemaValidator.java        | 25 ++++++++
 .../registry/core/service/SchemaServiceImpl.java   | 12 ++--
 .../registry/storage/rocketmq/RocketmqClient.java  |  3 +-
 9 files changed, 176 insertions(+), 57 deletions(-)

diff --git a/README.md b/README.md
index 33219df..1c0790e 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,7 @@ RocketMQ Schema Registry
 
 RocketMQ Schema Registry is a Topic Schema's management center. It provides a 
RESTful interface for register,
 delete, update, get and reference schema to subject(RocketMQ Topic). By 
associating Schema with subject, the New 
-RocketMQ client can send a structured date directly. User no longer need to 
care about the details of serialization
+RocketMQ client can send structured data directly. User no longer need to care 
about the details of serialization
 and deserialization.
 
 Schema struct can change, it will generate a new version number with each 
update. During schema evolution, the 
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index f9d48a0..87cacee 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -80,18 +80,17 @@ public class QualifiedName implements Serializable {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("{");
-        sb.append("\"cluster\":\"")
-            .append(cluster).append('\"');
-        sb.append("\"tenant\":\"")
-            .append(tenant).append('\"');
-        sb.append(",\"subject\":\"")
-            .append(subject).append('\"');
-        sb.append(",\"name\":\"")
-            .append(schema).append('\"');
-        sb.append(",\"version\":\"")
-            .append(version).append('\"');
-        sb.append('}');
+        final StringBuilder sb = new StringBuilder("(");
+        sb.append("cluster=")
+            .append(cluster).append(", ");
+        sb.append("tenant=")
+            .append(tenant).append(", ");
+        sb.append("subject=")
+            .append(subject).append(", ");
+        sb.append("name=")
+            .append(schema).append(", ");
+        sb.append("version=")
+            .append(version).append(")");
         return sb.toString();
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
index 6dbf86a..92477b1 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java
@@ -17,9 +17,13 @@
 
 package org.apache.rocketmq.schema.registry.common.model;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -103,4 +107,11 @@ public class SchemaInfo extends BaseInfo {
             getAudit().setLastModifiedTime(date);
         }
     }
+
+    public List<String> getAllRecordIdlInOrder() {
+        List<String> recordIdlList = getDetails().getSchemaRecords()
+            
.stream().map(SchemaRecordInfo::getIdl).collect(Collectors.toList());
+        Collections.reverse(recordIdlList);
+        return recordIdlList;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
index f1b9c70..48323f9 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java
@@ -46,16 +46,10 @@ import javax.tools.JavaFileObject;
 import javax.tools.StandardJavaFileManager;
 import javax.tools.ToolProvider;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaValidationException;
-import org.apache.avro.SchemaValidator;
-import org.apache.avro.SchemaValidatorBuilder;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
 import 
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
-import org.apache.rocketmq.schema.registry.common.model.Compatibility;
-import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 
 @Slf4j
 public class CommonUtil {
@@ -214,21 +208,17 @@ public class CommonUtil {
             } catch (Throwable e) {
                 log.error("Read from file error", e);
             } finally {
-                if (br != null) {
-                    try {
-                        br.close();
+                try {
+                    br.close();
 
-                    } catch (IOException ioException) {
-                        log.error("", ioException);
-                    }
+                } catch (IOException ioException) {
+                    log.error("", ioException);
                 }
-                if (inputStreamReader != null) {
-                    try {
-                        inputStreamReader.close();
+                try {
+                    inputStreamReader.close();
 
-                    } catch (IOException ioException) {
-                        log.error("", ioException);
-                    }
+                } catch (IOException ioException) {
+                    log.error("", ioException);
                 }
             }
         }
@@ -240,26 +230,9 @@ public class CommonUtil {
                 break;
             case JSON:
                 throw new SchemaCompatibilityException("Unsupported schema 
type: " + schemaDto.getMeta().getType());
-        }
-    }
-
-    public static void validateCompatibility(SchemaInfo update, SchemaInfo 
current,
-        Compatibility expectCompatibility) {
-        switch (update.getMeta().getType()) {
-            case AVRO:
-                SchemaValidator validator = new 
SchemaValidatorBuilder().canReadStrategy().validateLatest();
-                try {
-                    Schema toValidate = new 
Schema.Parser().parse(update.getLastRecordIdl());
-                    List<Schema> existing = new ArrayList<>();
-                    existing.add(new 
Schema.Parser().parse(current.getLastRecordIdl()));
-                    validator.validate(toValidate, existing);
-                } catch (SchemaValidationException e) {
-                    throw new SchemaCompatibilityException("Schema 
compatibility validation failed", e);
-                }
-                break;
             default:
-                throw new SchemaCompatibilityException("Unsupported schema 
type: " + update.getMeta().getType());
+                throw new SchemaCompatibilityException("Unexpected value: " + 
schemaDto.getMeta().getType());
         }
-
     }
+
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
new file mode 100644
index 0000000..ad38c3e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/AvroSchemaValidator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidator;
+import org.apache.avro.SchemaValidationException;
+import org.apache.avro.SchemaValidatorBuilder;
+import 
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.BACKWARD;
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.BACKWARD_TRANSITIVE;
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.FORWARD;
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.FORWARD_TRANSITIVE;
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.FULL;
+import static 
org.apache.rocketmq.schema.registry.common.model.Compatibility.FULL_TRANSITIVE;
+
+public class AvroSchemaValidator implements 
org.apache.rocketmq.schema.registry.core.compatibility.SchemaValidator {
+
+    private static final Map<Compatibility, SchemaValidator> 
SCHEMA_VALIDATOR_CACHE = new HashMap<>(6);
+
+    public AvroSchemaValidator() {
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(BACKWARD,
+            new SchemaValidatorBuilder().canReadStrategy().validateLatest());
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(BACKWARD_TRANSITIVE,
+            new SchemaValidatorBuilder().canReadStrategy().validateAll());
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(FORWARD,
+            new SchemaValidatorBuilder().canBeReadStrategy().validateLatest());
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(FORWARD_TRANSITIVE,
+            new SchemaValidatorBuilder().canBeReadStrategy().validateAll());
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(FULL,
+            new 
SchemaValidatorBuilder().mutualReadStrategy().validateLatest());
+        SCHEMA_VALIDATOR_CACHE.putIfAbsent(FULL_TRANSITIVE,
+            new SchemaValidatorBuilder().mutualReadStrategy().validateAll());
+    }
+
+    @Override
+    public void validate(SchemaInfo update, SchemaInfo current) {
+        org.apache.avro.SchemaValidator validator =
+            SCHEMA_VALIDATOR_CACHE.get(current.getMeta().getCompatibility());
+        try {
+            Schema toValidate = new 
Schema.Parser().parse(update.getLastRecordIdl());
+            List<Schema> existing = new ArrayList<>();
+            for (String schemaIdl : current.getAllRecordIdlInOrder()) {
+                existing.add(new Schema.Parser().parse(schemaIdl));
+            }
+            validator.validate(toValidate, existing);
+        } catch (SchemaValidationException e) {
+            throw new SchemaCompatibilityException("Schema compatibility 
validation failed", e);
+        }
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
new file mode 100644
index 0000000..20062d3
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/CompatibilityChecker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import 
org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+
+public class CompatibilityChecker {
+
+    private static final SchemaValidator SCHEMA_VALIDATOR_AVRO = new 
AvroSchemaValidator();
+
+    public static SchemaValidator getValidator(SchemaType schemaType) {
+        switch (schemaType) {
+            case AVRO:
+                return SCHEMA_VALIDATOR_AVRO;
+            default:
+                throw new SchemaCompatibilityException("Unsupported schema 
type: " + schemaType);
+        }
+    }
+}
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
new file mode 100644
index 0000000..d3b25d9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/compatibility/SchemaValidator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.core.compatibility;
+
+import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+
+public interface SchemaValidator {
+
+    void validate(SchemaInfo update, SchemaInfo current);
+}
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 2c2dca5..7f80549 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy;
 import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
 import org.apache.rocketmq.schema.registry.common.utils.IdGenerator;
 import org.apache.rocketmq.schema.registry.common.utils.StorageUtil;
+import 
org.apache.rocketmq.schema.registry.core.compatibility.CompatibilityChecker;
 import org.apache.rocketmq.schema.registry.core.dependency.DependencyService;
 
 @Slf4j
@@ -139,7 +140,7 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
 
         SchemaInfo current = storageServiceProxy.get(qualifiedName);
         if (current == null) {
-            throw new SchemaNotFoundException("Schema " + 
qualifiedName.fullName() + " not exist, ignored update.");
+            throw new SchemaNotFoundException("Schema " + 
qualifiedName.toString() + " not exist, ignored update.");
         }
 
         final SchemaRecordInfo updateRecord = new SchemaRecordInfo();
@@ -177,7 +178,8 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
             update.getAudit().updateBy(updateSchemaRequest.getOwner(), 
updateSchemaRequest.getDesc());
         }
 
-        CommonUtil.validateCompatibility(update, current, 
current.getMeta().getCompatibility());
+        // check compatibility
+        
CompatibilityChecker.getValidator(update.getMeta().getType()).validate(update, 
current);
 
         if (config.isUploadEnabled()) {
             Dependency dependency = dependencyService.compile(update);
@@ -201,7 +203,7 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
 
         SchemaRecordInfo current = 
storageServiceProxy.getBySubject(qualifiedName);
         if (current == null) {
-            throw new SchemaNotFoundException("Schema " + 
qualifiedName.fullName() + " not exist, ignored update.");
+            throw new SchemaNotFoundException("Schema " + 
qualifiedName.toString() + " not exist, ignored update.");
         }
 
         log.info("delete schema {}", qualifiedName);
@@ -244,7 +246,7 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
 
         SchemaRecordInfo recordInfo = 
storageServiceProxy.getBySubject(qualifiedName);
         if (recordInfo == null) {
-            throw new SchemaException("Subject: " + qualifiedName.fullName() + 
" not exist");
+            throw new SchemaException("Schema: " + qualifiedName.toString() + 
" not exist");
         }
 
         log.info("get schema by subject: {}", qualifiedName.getSubject());
@@ -260,7 +262,7 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
 
         List<SchemaRecordInfo> recordInfos = 
storageServiceProxy.listBySubject(qualifiedName);
         if (recordInfos == null) {
-            throw new SchemaException("Subject: " + qualifiedName.fullName() + 
" not exist");
+            throw new SchemaException("Schema: " + qualifiedName.toString() + 
" not exist");
         }
 
         log.info("list schema by subject: {}", qualifiedName.getSubject());
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index e13f2af..063816d 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -51,7 +51,6 @@ import 
org.apache.rocketmq.schema.registry.common.exception.SchemaExistException
 import 
org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
-import org.apache.rocketmq.schema.registry.common.model.SchemaDetailInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
 import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
@@ -251,6 +250,7 @@ public class RocketmqClient {
                     boolean isSchemaDeleted = 
Boolean.parseBoolean(msg.getUserProperty(DELETE_KEYS));
                     if (isSchemaDeleted) {
                         // delete
+                        log.info("receive delete schema msg, schema = {}", 
update);
                         deleteAllSubject(update);
                         cache.delete(schemaCfHandle(), schemaFullName);
                     }
@@ -333,7 +333,6 @@ public class RocketmqClient {
         try {
             synchronized (this) {
                 schemaInfo.setLastModifiedTime(new Date());
-                schemaInfo.setDetails(new SchemaDetailInfo());
                 Message msg = new Message(storageTopic, "", 
schemaInfo.schemaFullName(), converter.toJsonAsBytes(schemaInfo));
                 msg.putUserProperty(DELETE_KEYS, "true");
                 SendResult result = sendOrderMessageToRocketmq(msg);

Reply via email to