This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 3abfa0201efce802bd4a9b1e50315f4601a27e5a Author: huitong <[email protected]> AuthorDate: Tue Jul 19 22:10:43 2022 +0800 fix storage client --- .DS_Store | Bin 6148 -> 0 bytes .../registry/common/dto/SchemaRecordDto.java | 3 +- .../registry/common/model/SchemaRecordInfo.java | 2 +- .../common/storage/StorageServiceProxy.java | 8 +- .../registry/core/api/v1/SchemaController.java | 7 +- .../registry/core/service/SchemaServiceImpl.java | 2 +- .../registry/storage/rocketmq/RocketmqClient.java | 112 ++++++++++++--------- .../src/main/resources/rocketmq.properties | 2 +- 8 files changed, 77 insertions(+), 59 deletions(-) diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index a1ac880..0000000 Binary files a/.DS_Store and /dev/null differ diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java index f844188..585629c 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SchemaRecordDto.java @@ -25,6 +25,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import org.apache.rocketmq.schema.registry.common.model.Dependency; +import org.apache.rocketmq.schema.registry.common.model.SchemaType; @Data @EqualsAndHashCode(callSuper = false) @@ -52,5 +53,5 @@ public class SchemaRecordDto { private List<SubjectDto> subjects; @ApiModelProperty(value = "Schema type") - private String type; + private SchemaType type; } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java index 2ec4d53..02af194 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java @@ -37,7 +37,7 @@ public class SchemaRecordInfo implements Serializable { private String idl; private Dependency dependency; private List<SubjectInfo> subjects; - private String type; + private SchemaType type; // private List<FieldInfo> fields; public void bindSubject(final SubjectInfo subjectInfo) { diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java index bd134d8..41feb2c 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java @@ -70,7 +70,7 @@ public class StorageServiceProxy { * * @param name Qualified name with tenant / name of schema */ - @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()") + @CacheEvict(key = "'schema.' + #name.getSchema()") public void delete(final QualifiedName name) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -86,7 +86,7 @@ public class StorageServiceProxy { * @param schemaInfo schema information instance * @return true if errors after this should be ignored. */ - @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()") + @CacheEvict(key = "'schema.' + #name.getSchema()") public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -103,7 +103,7 @@ public class StorageServiceProxy { * @param useCache if schema can be retrieved from cache * @return schema information instance */ - @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache") + @Cacheable(key = "'schema.' + #name.getSchema()", condition = "#useCache") public SchemaInfo get(final QualifiedName name, final boolean useCache) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -112,7 +112,6 @@ public class StorageServiceProxy { return storageService.get(storageServiceContext, name); } - @Cacheable(key = "'subject.' + #name.getSubject() + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null") public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -121,7 +120,6 @@ public class StorageServiceProxy { return storageService.getBySubject(storageServiceContext, name); } - @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache") public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java index edbe75d..335294c 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java @@ -6,19 +6,18 @@ package org.apache.rocketmq.schema.registry.core.api.v1; +import java.net.HttpURLConnection; +import java.util.List; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import java.net.HttpURLConnection; -import java.util.List; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; -import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException; import org.apache.rocketmq.schema.registry.core.api.RequestProcessor; import org.apache.rocketmq.schema.registry.core.service.SchemaService; import org.springframework.beans.factory.annotation.Autowired; diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java index 4073431..5f72d34 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java @@ -98,7 +98,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { schemaInfo.setUniqueId(idGenerator.nextId()); schemaInfo.setLastRecordVersion(1L); schemaInfo.getLastRecord().setSchema(qualifiedName.schemaFullName()); - schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType().name()); + schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType()); schemaInfo.getLastRecord().bindSubject(qualifiedName.subjectInfo()); if (config.isUploadEnabled()) { diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 1a25063..913c360 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -24,6 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -33,6 +36,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; @@ -48,6 +52,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -83,6 +88,9 @@ public class RocketmqClient { private final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); private final Map<String, ColumnFamilyHandle> cfHandleMap = new HashMap<>(); + private ScheduledExecutorService scheduledExecutorService; + + private static final Integer PULL_TASK_INTERVAL = 5 * 1000; /** * RocksDB for cache @@ -178,61 +186,70 @@ public class RocketmqClient { e.printStackTrace(); } }); - while (true) { - List<MessageExt> msgList = scheduleConsumer.poll(1000); - if (msgList != null) { - msgList.forEach(this::consumeMessage); - } - } + this.scheduledExecutorService.scheduleAtFixedRate(new RocketmqStoragePullTask(), + 0, PULL_TASK_INTERVAL, TimeUnit.MILLISECONDS); + } catch (MQClientException e) { throw new SchemaException("Rocketmq client start failed", e); } } - private void consumeMessage(MessageExt msg) { - synchronized (this) { - try { - if (msg.getKeys().equals(DELETE_KEYS)) { - // delete - byte[] schemaFullName = msg.getBody(); - byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); - if (schemaInfoBytes != null) { - deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class)); - cache.delete(schemaCfHandle(), schemaFullName); - } - } else { - byte[] schemaFullName = converter.toBytes(msg.getKeys()); - byte[] schemaInfoBytes = msg.getBody(); - SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class); - byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord()); - - byte[] result = cache.get(schemaCfHandle(), schemaFullName); - if (result == null) { - // register - cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); - cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes); - } else { - SchemaInfo current = converter.fromJson(result, SchemaInfo.class); - if (current.getLastRecordVersion() == update.getLastRecordVersion()) { - return; - } - if (current.getLastRecordVersion() > update.getLastRecordVersion()) { - throw new SchemaException("Schema version is invalid, update: " - + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion()); - } + public class RocketmqStoragePullTask implements Runnable { + + @Override + public void run() { + List<MessageExt> msgList = scheduleConsumer.poll(1000); + if (CollectionUtils.isNotEmpty(msgList)) { + msgList.forEach(this::consumeMessage); + } + } - cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); - update.getLastRecord().getSubjects().forEach(subject -> { - try { - cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes); - } catch (RocksDBException e) { - throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e); + private void consumeMessage(MessageExt msg) { + synchronized (this) { + try { + log.info("receive msg, the content is {}", new String(msg.getBody())); + if (DELETE_KEYS.equals(msg.getKeys())) { + // delete + byte[] schemaFullName = msg.getBody(); + byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); + if (schemaInfoBytes != null) { + deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class)); + cache.delete(schemaCfHandle(), schemaFullName); + } + } else { + byte[] schemaFullName = converter.toBytes(msg.getKeys()); + byte[] schemaInfoBytes = msg.getBody(); + SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class); + byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord()); + + byte[] result = cache.get(schemaCfHandle(), schemaFullName); + if (result == null) { + // register + cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); + cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes); + } else { + SchemaInfo current = converter.fromJson(result, SchemaInfo.class); + if (current.getLastRecordVersion() == update.getLastRecordVersion()) { + return; + } + if (current.getLastRecordVersion() > update.getLastRecordVersion()) { + throw new SchemaException("Schema version is invalid, update: " + + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion()); } - }); + + cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); + update.getLastRecord().getSubjects().forEach(subject -> { + try { + cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes); + } catch (RocksDBException e) { + throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e); + } + }); + } } + } catch (Throwable e) { + throw new SchemaException("Rebuild schema cache failed", e); } - } catch (Throwable e) { - throw new SchemaException("Rebuild schema cache failed", e); } } } @@ -386,6 +403,9 @@ public class RocketmqClient { ); this.converter = new JsonConverterImpl(); + + this.scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RocketmqStoragePullTask")); } private ColumnFamilyHandle schemaCfHandle() { diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties index 3a94c6a..5070713 100644 --- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties +++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties @@ -16,4 +16,4 @@ # storage.type=rocketmq -storage.local.cache.path=/Users/xyb/app/schema-registry/cache \ No newline at end of file +#storage.local.cache.path \ No newline at end of file
