This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 05bcf9d8f243572efc598dae6237afc54cb2bda3 Author: huitong <[email protected]> AuthorDate: Mon Jul 25 11:20:10 2022 +0800 fix delete version --- .../registry/common/exception/SchemaExistException.java | 2 +- .../rocketmq/schema/registry/common/model/SchemaInfo.java | 3 +++ .../schema/registry/storage/rocketmq/RocketmqClient.java | 13 ++++++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java index 462f0a0..f8a9071 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java @@ -27,7 +27,7 @@ public class SchemaExistException extends SchemaException { private final int errorCode = 40401; public SchemaExistException(final QualifiedName qualifiedName) { - this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName)); + this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName.schemaFullName())); } public SchemaExistException(final String msg) { diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java index 56f1953..6fc1ecf 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java @@ -81,4 +81,7 @@ public class SchemaInfo extends BaseInfo { getLastRecord().setVersion(version); } + public int getRecordCount() { + return getDetails().getSchemaRecords().size(); + } } diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 9c0bc7a..f671a7b 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -234,7 +234,8 @@ public class RocketmqClient { } synchronized (this) { try { - log.info("receive msg, the content is {}", new String(msg.getBody())); + log.info("receive msg, queue={}, offset={}, key={}, the content is {}", msg.getQueueId(), + msg.getQueueOffset(), msg.getKeys(), new String(msg.getBody())); if (msg.getKeys().equals(DELETE_KEYS)) { // delete byte[] schemaFullName = msg.getBody(); @@ -256,11 +257,12 @@ public class RocketmqClient { cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes); } else { SchemaInfo current = converter.fromJson(result, SchemaInfo.class); - if (current.getLastRecordVersion() == update.getLastRecordVersion()) { + boolean isDeleted = current.getRecordCount() > update.getRecordCount(); + if (current.getLastRecordVersion() == update.getLastRecordVersion() && !isDeleted) { log.info("Schema version is the same, no need to update."); return; } - if (current.getLastRecordVersion() > update.getLastRecordVersion()) { + if (current.getLastRecordVersion() > update.getLastRecordVersion() && !isDeleted) { throw new SchemaException("Schema version is invalid, update: " + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion()); } @@ -338,11 +340,16 @@ public class RocketmqClient { if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) { throw new SchemaNotFoundException(name); } + List<SubjectInfo> subjects = schemaInfo.getLastRecord().getSubjects(); List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords(); schemaRecords.removeIf(record -> record.getVersion() == name.getVersion()); if (CollectionUtils.isEmpty(schemaRecords)) { deleteBySubject(name); } + // delete but still need bind subject + if (schemaInfo.getLastRecord().getSubjects().isEmpty()) { + schemaInfo.getLastRecord().setSubjects(subjects); + } byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo); try {
