This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 4550c9a2562e3bcfaac327067c0837239068b1e2 Author: huitong <[email protected]> AuthorDate: Tue Jul 19 16:37:17 2022 +0800 fix delete schema --- .../schema/registry/common/QualifiedName.java | 4 +- .../registry/storage/rocketmq/RocketmqClient.java | 71 ++++++++++++++-------- .../rocketmq/RocketmqStorageClientImpl.java | 20 +++--- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java index e945909..5cda52b 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java @@ -71,11 +71,11 @@ public class QualifiedName implements Serializable { } public String fullName() { - return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version; + return cluster + '/' + tenant + '/' + subject + '/' + schema; } public String schemaFullName() { - return schema + '/' + version; + return schema; } public String subjectFullName() { diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 8e1a21f..1d07084 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -27,6 +27,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -245,10 +246,8 @@ public class RocketmqClient { // TODO: next query on other machine may can't found schema in cache since send is async with receive public SchemaInfo registerSchema(SchemaInfo schema) { - byte[] subjectFullName = converter.toBytes(schema.subjectFullName()); byte[] schemaFullName = converter.toBytes(schema.schemaFullName()); byte[] schemaInfo = converter.toJsonAsBytes(schema); - byte[] lastRecord = converter.toJsonAsBytes(schema.getLastRecord()); try { synchronized (this) { @@ -260,9 +259,6 @@ public class RocketmqClient { if (!result.getSendStatus().equals(SendStatus.SEND_OK)) { throw new SchemaException("Register schema: " + schema.getQualifiedName() + " failed: " + result.getSendStatus()); } - - cache.put(schemaCfHandle(), schemaFullName, schemaInfo); - cache.put(subjectCfHandle(), subjectFullName, lastRecord); } return schema; @@ -273,24 +269,22 @@ public class RocketmqClient { } } - public void delete(QualifiedName name) { - byte[] schemaFullName = converter.toBytes(name.schemaFullName()); + public void deleteBySubject(QualifiedName name) { + + SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName()); + if (schemaInfo == null) { + throw new SchemaNotFoundException(name); + } try { synchronized (this) { - byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); - if (schemaInfoBytes == null) { - throw new SchemaNotFoundException(name); - } + byte[] schemaFullName = converter.toBytes(schemaInfo.schemaFullName()); Message msg = new Message(storageTopic, "", DELETE_KEYS, schemaFullName); SendResult result = producer.send(msg); if (!result.getSendStatus().equals(SendStatus.SEND_OK)) { throw new SchemaException("Delete schema: " + name + " failed: " + result.getSendStatus()); } - - cache.delete(schemaCfHandle(), schemaFullName); - deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class)); } } catch (SchemaException e) { throw e; @@ -299,10 +293,36 @@ public class RocketmqClient { } } + public void deleteByVersion(QualifiedName name) { + + SchemaInfo schemaInfo = getSchemaInfoBySubject(name.subjectFullName()); + if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) { + throw new SchemaNotFoundException(name); + } + List<SchemaRecordInfo> schemaRecords = schemaInfo.getDetails().getSchemaRecords(); + schemaRecords.removeIf(record -> record.getVersion() == name.getVersion()); + if (CollectionUtils.isEmpty(schemaRecords)) { + deleteBySubject(name); + } + byte[] schemaInfoBytes = converter.toJsonAsBytes(schemaInfo); + + try { + synchronized (this) { + Message msg = new Message(storageTopic, "", schemaInfo.schemaFullName(), schemaInfoBytes); + SendResult result = producer.send(msg); + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new SchemaException("Update " + name + " failed: " + result.getSendStatus()); + } + } + } catch (SchemaException e) { + throw e; + } catch (Exception e) { + throw new SchemaException("Update schema " + name + " failed", e); + } + } + public SchemaInfo updateSchema(SchemaInfo update) { - byte[] schemaFullName = converter.toBytes(update.schemaFullName()); byte[] schemaInfo = converter.toJsonAsBytes(update); - byte[] lastRecord = converter.toJsonAsBytes(update.getLastRecord()); try { synchronized (this) { @@ -311,15 +331,6 @@ public class RocketmqClient { if (result.getSendStatus() != SendStatus.SEND_OK) { throw new SchemaException("Update " + update.getQualifiedName() + " failed: " + result.getSendStatus()); } - - cache.put(schemaCfHandle(), schemaFullName, schemaInfo); - update.getLastRecord().getSubjects().forEach(subject -> { - try { - cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecord); - } catch (RocksDBException e) { - throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed", e); - } - }); } return update; } catch (SchemaException e) { @@ -346,6 +357,16 @@ public class RocketmqClient { } } + public SchemaInfo getSchemaInfoBySubject(String subjectFullName) { + byte[] lastRecordBytes = getBySubject(subjectFullName); + if (lastRecordBytes == null) { + return null; + } + SchemaRecordInfo lastRecord = converter.fromJson(lastRecordBytes, SchemaRecordInfo.class); + byte[] result = getSchema(lastRecord.getSchema()); + return result == null ? null : converter.fromJson(result, SchemaInfo.class); + } + private void init(Properties props) { this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT); this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT); diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java index 2fa82ff..2b0987e 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java @@ -64,7 +64,11 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient { */ @Override public void delete(QualifiedName qualifiedName) { - rocketmqClient.delete(qualifiedName); + if (qualifiedName.getVersion() == null) { + rocketmqClient.deleteBySubject(qualifiedName); + } else { + rocketmqClient.deleteByVersion(qualifiedName); + } } /** @@ -101,7 +105,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient { } // schema version is given - SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName()); + SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName()); if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) { return null; } @@ -112,20 +116,10 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient { @Override public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) { - SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName()); + SchemaInfo schemaInfo = rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName()); if (schemaInfo == null || schemaInfo.getDetails() == null) { return null; } return schemaInfo.getDetails().getSchemaRecords(); } - - private SchemaInfo getSchemaInfoBySubject(String subjectFullName) { - byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName); - if (lastRecordBytes == null) { - return null; - } - SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class); - byte[] result = rocketmqClient.getSchema(lastRecord.getSchema()); - return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class); - } }
