This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit b1a1fc95e5fddedfd0b6607f591b8e03e9cda3da Author: huitong <[email protected]> AuthorDate: Mon Jul 18 17:22:09 2022 +0800 add getSchema method and fix storage consumer --- .DS_Store | Bin 0 -> 6148 bytes .../schema/registry/common/QualifiedName.java | 21 ++- .../exception/SchemaAuthorizedException.java | 5 + .../exception/SchemaCompatibilityException.java | 4 + .../registry/common/exception/SchemaException.java | 5 + .../common/exception/SchemaExistException.java | 4 + .../common/exception/SchemaNotFoundException.java | 4 + .../registry/common/storage/StorageService.java | 6 + .../common/storage/StorageServiceProxy.java | 19 ++- core/pom.xml | 8 - .../registry/core/api/v1/SchemaController.java | 79 ++++++++- .../core/expection}/RequestExceptionHandler.java | 15 +- .../registry/core/service/SchemaService.java | 9 + .../registry/core/service/SchemaServiceImpl.java | 19 +++ schema-storage-rocketmq/pom.xml | 6 + .../registry/storage/rocketmq/RocketmqClient.java | 182 ++++++++++++++------- .../storage/rocketmq/RocketmqStorageClient.java | 11 ++ .../rocketmq/RocketmqStorageClientImpl.java | 42 ++++- .../storage/rocketmq/RocketmqStorageService.java | 7 + .../rocketmq/configs/RocketmqConfigConstants.java | 2 +- .../src/main/resources/rocketmq.properties | 2 +- 21 files changed, 352 insertions(+), 98 deletions(-) diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..a1ac880 Binary files /dev/null and b/.DS_Store differ diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java index 39c812f..0fb34bf 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java @@ -38,6 +38,7 @@ public class QualifiedName implements Serializable { private String tenant; private String subject; private String schema; + private Long version; public QualifiedName( @Nullable final String cluster, @@ -51,16 +52,30 @@ public class QualifiedName implements Serializable { this.schema = schema; } + public QualifiedName( + @Nullable final String cluster, + @Nullable final String tenant, + @Nullable final String subject, + @Nullable final String schema, + @Nullable final Long version + ) { + this.cluster= cluster; + this.tenant= tenant; + this.subject= subject; + this.schema = schema; + this.version = version; + } + public SubjectInfo subjectInfo() { return new SubjectInfo(cluster, subject); } public String fullName() { - return cluster + '/' + tenant + '/' + subject + '/' + schema; + return cluster + '/' + tenant + '/' + subject + '/' + schema + '/' + version; } public String schemaFullName() { - return tenant + '/' + schema; + return tenant + '/' + schema + '/' + version; } public String subjectFullName() { @@ -78,6 +93,8 @@ public class QualifiedName implements Serializable { .append(subject).append('\"'); sb.append(",\"name\":\"") .append(schema).append('\"'); + sb.append(",\"version\":\"") + .append(version).append('\"'); sb.append('}'); return sb.toString(); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java index b190017..c1cea96 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaAuthorizedException.java @@ -17,9 +17,14 @@ package org.apache.rocketmq.schema.registry.common.exception; +import lombok.Getter; + +@Getter public class SchemaAuthorizedException extends SchemaException { private static final long serialVersionUID = 204882338833006991L; + private final int errorCode = 40101; + public SchemaAuthorizedException(final String tenant, final String schemaName) { this(String.format("Schema: %s/%s not found, please check your configuration.", tenant, schemaName)); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java index c694cb0..cf2aafd 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaCompatibilityException.java @@ -17,11 +17,15 @@ package org.apache.rocketmq.schema.registry.common.exception; +import lombok.Getter; import org.apache.rocketmq.schema.registry.common.QualifiedName; +@Getter public class SchemaCompatibilityException extends SchemaException { private static final long serialVersionUID = 2602020608319903212L; + private final int errorCode = 40901; + public SchemaCompatibilityException(final QualifiedName qualifiedName) { this(String.format("Schema: %s validate failed.", qualifiedName)); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java index 524b54c..a95fb90 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaException.java @@ -17,8 +17,13 @@ package org.apache.rocketmq.schema.registry.common.exception; +import lombok.Getter; + +@Getter public class SchemaException extends RuntimeException { + private final int errorCode = 50001; + /** Constructor. */ public SchemaException() { super(); diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java index bcea88c..462f0a0 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaExistException.java @@ -17,11 +17,15 @@ package org.apache.rocketmq.schema.registry.common.exception; +import lombok.Getter; import org.apache.rocketmq.schema.registry.common.QualifiedName; +@Getter public class SchemaExistException extends SchemaException { private static final long serialVersionUID = -9177284523006645052L; + private final int errorCode = 40401; + public SchemaExistException(final QualifiedName qualifiedName) { this(String.format("Schema: %s is exist, please check your configuration.", qualifiedName)); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java index 12bcedb..0a3b7b5 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/SchemaNotFoundException.java @@ -17,11 +17,15 @@ package org.apache.rocketmq.schema.registry.common.exception; +import lombok.Getter; import org.apache.rocketmq.schema.registry.common.QualifiedName; +@Getter public class SchemaNotFoundException extends SchemaException { private static final long serialVersionUID = 554251224980156176L; + private final int errorCode = 40402; + public SchemaNotFoundException(final QualifiedName qualifiedName) { this(String.format("Schema: %s not found, please check your configuration.", qualifiedName)); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java index cdcc36b..7e57d1f 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.schema.registry.common.storage; +import java.util.List; + import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.context.StorageServiceContext; import org.apache.rocketmq.schema.registry.common.model.BaseInfo; @@ -78,4 +80,8 @@ public interface StorageService<T extends BaseInfo> { default SchemaRecordInfo getBySubject(final StorageServiceContext context, final QualifiedName name) { throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT); } + + default List<SchemaRecordInfo> listBySubject(final StorageServiceContext context, final QualifiedName name) { + throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT); + } } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java index 4ff223c..bd134d8 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.schema.registry.common.storage; +import java.util.List; + import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.context.RequestContext; @@ -68,7 +70,7 @@ public class StorageServiceProxy { * * @param name Qualified name with tenant / name of schema */ - @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()") + @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()") public void delete(final QualifiedName name) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -84,7 +86,7 @@ public class StorageServiceProxy { * @param schemaInfo schema information instance * @return true if errors after this should be ignored. */ - @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.schema()") + @CacheEvict(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()") public SchemaInfo update(final QualifiedName name, final SchemaInfo schemaInfo) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -101,7 +103,7 @@ public class StorageServiceProxy { * @param useCache if schema can be retrieved from cache * @return schema information instance */ - @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.schema()", condition = "#useCache") + @Cacheable(key = "'schema.' + #name.getTenant() + '/' + #name.getSchema()", condition = "#useCache") public SchemaInfo get(final QualifiedName name, final boolean useCache) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -110,7 +112,7 @@ public class StorageServiceProxy { return storageService.get(storageServiceContext, name); } - @Cacheable(key = "'subject.' + #subject", condition = "#useCache") + @Cacheable(key = "'subject.' + #name.getSubject() + '/' + #name.getVersion()", condition = "#useCache && #name.getVersion() != null") public SchemaRecordInfo getBySubject(final QualifiedName name, final boolean useCache) { final RequestContext requestContext = RequestContextManager.getContext(); final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); @@ -118,4 +120,13 @@ public class StorageServiceProxy { return storageService.getBySubject(storageServiceContext, name); } + + @Cacheable(key = "'subject.' + #name.getSubject()", condition = "#useCache") + public List<SchemaRecordInfo> listBySubject(final QualifiedName name, final boolean useCache) { + final RequestContext requestContext = RequestContextManager.getContext(); + final StorageServiceContext storageServiceContext = storageUtil.convertToStorageServiceContext(requestContext); + final StorageService<SchemaInfo> storageService = storageManager.getStorageService(); + + return storageService.listBySubject(storageServiceContext, name); + } } diff --git a/core/pom.xml b/core/pom.xml index 51a2647..2d2f203 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -30,14 +30,6 @@ <version>0.0.2-SNAPSHOT</version> </dependency> - <dependency> - <groupId>com.sun</groupId> - <artifactId>tools</artifactId> - <version>1.8</version> - <scope>system</scope> - <systemPath>${java.home}/../lib/tools.jar</systemPath> - </dependency> - </dependencies> <build> diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java index c575e1d..109775b 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java @@ -12,6 +12,7 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.net.HttpURLConnection; +import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; @@ -296,11 +297,11 @@ public class SchemaController { @RequestMapping( method = RequestMethod.GET, - path = "/subject/{subject-name}" + path = "/subject/{subject-name}/schema" ) @ApiOperation( value = "Schema information", - notes = "Schema information for the given schema name under the subject") + notes = "Schema information with the latest version under the subject") @ApiResponses( { @ApiResponse( @@ -314,7 +315,7 @@ public class SchemaController { } ) public SchemaRecordDto getSchemaBySubject( - @ApiParam(value = "The name of the schema", required = true) + @ApiParam(value = "The name of the subject", required = true) @PathVariable("subject-name") String subject ) { return getSchemaBySubject("default", subject); @@ -322,11 +323,11 @@ public class SchemaController { @RequestMapping( method = RequestMethod.GET, - path = "/cluster/{cluster-name}/subject/{subject-name}" + path = "/cluster/{cluster-name}/subject/{subject-name}/schema" ) @ApiOperation( value = "Schema information", - notes = "Schema information for the given schema name under the subject") + notes = "Schema information with the latest version under the subject") @ApiResponses( { @ApiResponse( @@ -352,4 +353,72 @@ public class SchemaController { () -> schemaService.getBySubject(name) ); } + + @RequestMapping( + method = RequestMethod.GET, + path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions/{version}" + ) + @ApiOperation( + value = "Schema information", + notes = "Schema information with the given version under the subject") + @ApiResponses( + { + @ApiResponse( + code = HttpURLConnection.HTTP_OK, + message = "The schema is returned" + ), + @ApiResponse( + code = HttpURLConnection.HTTP_NOT_FOUND, + message = "The requested tenant or schema cannot be found" + ) + } + ) + public SchemaRecordDto getSchemaBySubject( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable("cluster-name") String cluster, + @ApiParam(value = "The name of the subject", required = true) + @PathVariable("subject-name") String subject, + @ApiParam(value = "The version of the schema", required = true) + @PathVariable("version") String version + ) { + QualifiedName name = new QualifiedName(cluster, null, subject, null, Long.parseLong(version)); + + return this.requestProcessor.processRequest( + "getSchemaBySubject", + () -> schemaService.getBySubject(name) + ); + } + + @RequestMapping( + method = RequestMethod.GET, + path = "/cluster/{cluster-name}/subject/{subject-name}/schema/versions" + ) + @ApiOperation( + value = "Schema information", + notes = "Schema information with a list of versions under the subject") + @ApiResponses( + { + @ApiResponse( + code = HttpURLConnection.HTTP_OK, + message = "The schema is returned" + ), + @ApiResponse( + code = HttpURLConnection.HTTP_NOT_FOUND, + message = "The requested tenant or schema cannot be found" + ) + } + ) + public List<SchemaRecordDto> getSchemaListBySubject( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable("cluster-name") String cluster, + @ApiParam(value = "The name of the subject", required = true) + @PathVariable("subject-name") String subject + ) { + QualifiedName name = new QualifiedName(cluster, null, subject, null); + + return this.requestProcessor.processRequest( + "getSchemaListBySubject", + () -> schemaService.listBySubject(name) + ); + } } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java similarity index 80% rename from common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java rename to core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java index 0c0cb29..04a74f3 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/exception/RequestExceptionHandler.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/expection/RequestExceptionHandler.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.rocketmq.schema.registry.common.exception; +package org.apache.rocketmq.schema.registry.core.expection; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import org.apache.rocketmq.schema.registry.common.exception.SchemaException; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.RestControllerAdvice; @@ -37,21 +38,13 @@ public class RequestExceptionHandler { * @param e The inner exception to handle * @throws IOException on error in sending error */ - @ExceptionHandler({SchemaException.class}) + @ExceptionHandler(SchemaException.class) public void handleException( final HttpServletResponse response, final SchemaException e ) throws IOException { - final int status; - - if (e instanceof SchemaNotFoundException) { - status = HttpStatus.NOT_FOUND.value(); - } else { - status = HttpStatus.INTERNAL_SERVER_ERROR.value(); - } - log.error("Global handle SchemaException: " + e.getMessage(), e); - response.sendError(status, e.getMessage()); + response.sendError(e.getErrorCode(), e.getMessage()); } } diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java index 2618c29..84c70cc 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.schema.registry.core.service; +import java.util.List; import java.util.Optional; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.dto.BaseDto; @@ -67,4 +68,12 @@ public interface SchemaService<T extends BaseDto> { * @return schema object with the schemaName */ SchemaRecordDto getBySubject(QualifiedName qualifiedName); + + /** + * Query the schema object with the given subject name. + * + * @param qualifiedName subject of the schema binding + * @return schema object with the schemaName + */ + List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName); } diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java index 4f4bfb2..56efb9c 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java @@ -21,6 +21,8 @@ import com.google.common.base.Strings; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.context.RequestContext; @@ -236,6 +238,23 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { return storageUtil.convertToSchemaRecordDto(recordInfo); } + @Override + public List<SchemaRecordDto> listBySubject(QualifiedName qualifiedName) { + final RequestContext requestContext = RequestContextManager.getContext(); + log.info("register get request context: " + requestContext); + + // CommonUtil.validateName(qualifiedName); + this.accessController.checkPermission("", qualifiedName.getSubject(), SchemaOperation.GET); + + List<SchemaRecordInfo> recordInfos = storageServiceProxy.listBySubject(qualifiedName, config.isCacheEnabled()); + if (recordInfos == null) { + throw new SchemaException("Subject: " + qualifiedName + " not exist"); + } + + log.info("list schema by subject: {}", qualifiedName.getSubject()); + return recordInfos.stream().map(storageUtil::convertToSchemaRecordDto).collect(Collectors.toList()); + } + private void checkSchemaExist(final QualifiedName qualifiedName) { if (storageServiceProxy.get(qualifiedName, config.isCacheEnabled()) != null) { throw new SchemaExistException(qualifiedName); diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml index d91306f..a3f4b17 100644 --- a/schema-storage-rocketmq/pom.xml +++ b/schema-storage-rocketmq/pom.xml @@ -40,6 +40,12 @@ <artifactId>rocketmq-client</artifactId> <version>4.9.3</version> </dependency> + + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + <version>4.9.3</version> + </dependency> </dependencies> <properties> diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 1dbba4a..8e1a21f 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -19,12 +19,16 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -33,8 +37,14 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.exception.SchemaException; import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException; @@ -44,6 +54,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.DBOptions; @@ -68,9 +79,9 @@ import static org.apache.rocketmq.schema.registry.storage.rocketmq.configs.Rocke @Slf4j public class RocketmqClient { - private Properties properties; private DefaultMQProducer producer; - private DefaultMQPushConsumer scheduleConsumer; + private DefaultLitePullConsumer scheduleConsumer; + private DefaultMQAdminExt mqAdminExt; private String storageTopic; private String cachePath; private JsonConverter converter; @@ -89,10 +100,40 @@ public class RocketmqClient { public RocketmqClient(Properties props) { init(props); + createStorageTopic(); startRemoteStorage(); startLocalCache(); } + private void createStorageTopic() { + + try { + mqAdminExt.start(); + + try { + ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo(); + HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable(); + for (BrokerData brokerData : brokerAddrTable.values()) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(storageTopic); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(8); + // TODO compact topic (TopicAttributes) + String brokerAddr = brokerData.selectBrokerAddr(); + mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig); + } + } catch (Exception e) { + throw new SchemaException("Failed to create storage rocketmq topic", e); + } finally { + mqAdminExt.shutdown(); + } + + } catch (MQClientException e) { + throw new SchemaException("Rocketmq admin tool start failed", e); + } + + } + private void startLocalCache() { try { List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath); @@ -131,66 +172,77 @@ public class RocketmqClient { try { producer.start(); - scheduleConsumer.subscribe(storageTopic, "*"); - scheduleConsumer.registerMessageListener(new MessageListenerConcurrently() { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { - msgs.forEach(msg -> { - synchronized (this) { - try { - if (msg.getKeys().equals(DELETE_KEYS)) { - // delete - byte[] schemaFullName = msg.getBody(); - byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); - if (schemaInfoBytes != null) { - deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class)); - cache.delete(schemaCfHandle(), schemaFullName); - } - } else { - byte[] schemaFullName = converter.toBytes(msg.getKeys()); - byte[] schemaInfoBytes = msg.getBody(); - SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class); - byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord()); - - byte[] result = cache.get(schemaCfHandle(), schemaFullName); - if (result == null) { - // register - cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); - cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes); - } else { - SchemaInfo current = converter.fromJson(result, SchemaInfo.class); - if (current.getLastRecordVersion() == update.getLastRecordVersion()) { - return; - } - if (current.getLastRecordVersion() > update.getLastRecordVersion()) { - throw new SchemaException("Schema version is invalid, update: " - + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion()); - } - - cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); - update.getLastRecord().getSubjects().forEach(subject -> { - try { - cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes); - } catch (RocksDBException e) { - throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e); - } - }); - } - } - } catch (Throwable e) { - throw new SchemaException("Rebuild schema cache failed", e); - } - } - }); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + scheduleConsumer.setPullThreadNums(4); + scheduleConsumer.start(); + + Collection<MessageQueue> messageQueueList = scheduleConsumer.fetchMessageQueues(storageTopic); + scheduleConsumer.assign(messageQueueList); + messageQueueList.forEach(mq -> { + try { + scheduleConsumer.seekToBegin(mq); + } catch (MQClientException e) { + e.printStackTrace(); } }); - scheduleConsumer.start(); + while (true) { + List<MessageExt> msgList = scheduleConsumer.poll(1000); + if (msgList != null) { + msgList.forEach(this::consumeMessage); + } + } } catch (MQClientException e) { throw new SchemaException("Rocketmq client start failed", e); } } + private void consumeMessage(MessageExt msg) { + synchronized (this) { + try { + if (msg.getKeys().equals(DELETE_KEYS)) { + // delete + byte[] schemaFullName = msg.getBody(); + byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); + if (schemaInfoBytes != null) { + deleteAllSubject(converter.fromJson(schemaInfoBytes, SchemaInfo.class)); + cache.delete(schemaCfHandle(), schemaFullName); + } + } else { + byte[] schemaFullName = converter.toBytes(msg.getKeys()); + byte[] schemaInfoBytes = msg.getBody(); + SchemaInfo update = converter.fromJson(schemaInfoBytes, SchemaInfo.class); + byte[] lastRecordBytes = converter.toJsonAsBytes(update.getLastRecord()); + + byte[] result = cache.get(schemaCfHandle(), schemaFullName); + if (result == null) { + // register + cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); + cache.put(subjectCfHandle(), converter.toBytes(update.subjectFullName()), lastRecordBytes); + } else { + SchemaInfo current = converter.fromJson(result, SchemaInfo.class); + if (current.getLastRecordVersion() == update.getLastRecordVersion()) { + return; + } + if (current.getLastRecordVersion() > update.getLastRecordVersion()) { + throw new SchemaException("Schema version is invalid, update: " + + update.getLastRecordVersion() + ", but current: " + current.getLastRecordVersion()); + } + + cache.put(schemaCfHandle(), schemaFullName, schemaInfoBytes); + update.getLastRecord().getSubjects().forEach(subject -> { + try { + cache.put(subjectCfHandle(), converter.toBytes(subject.fullName()), lastRecordBytes); + } catch (RocksDBException e) { + throw new SchemaException("Update schema: " + update.getQualifiedName() + " failed.", e); + } + }); + } + } + } catch (Throwable e) { + throw new SchemaException("Rebuild schema cache failed", e); + } + } + } + // TODO: next query on other machine may can't found schema in cache since send is async with receive public SchemaInfo registerSchema(SchemaInfo schema) { byte[] subjectFullName = converter.toBytes(schema.subjectFullName()); @@ -277,25 +329,24 @@ public class RocketmqClient { } } - public byte[] getSchema(QualifiedName qualifiedName) { + public byte[] getSchema(String schemaFullName) { try { // TODO: get from rocketmq topic if cache not contain - return cache.get(schemaCfHandle(), converter.toBytes(qualifiedName.schemaFullName())); + return cache.get(schemaCfHandle(), converter.toBytes(schemaFullName)); } catch (RocksDBException e) { - throw new SchemaException("Get schema " + qualifiedName + " failed", e); + throw new SchemaException("Get schema " + schemaFullName + " failed", e); } } - public byte[] getBySubject(QualifiedName qualifiedName) { + public byte[] getBySubject(String subjectFullName) { try { - return cache.get(subjectCfHandle(), converter.toBytes(qualifiedName.subjectFullName())); + return cache.get(subjectCfHandle(), converter.toBytes(subjectFullName)); } catch (RocksDBException e) { - throw new SchemaException("Get by subject " + qualifiedName + " failed", e); + throw new SchemaException("Get by subject " + subjectFullName + " failed", e); } } private void init(Properties props) { - this.properties = props; this.storageTopic = props.getProperty(STORAGE_ROCKETMQ_TOPIC, STORAGE_ROCKETMQ_TOPIC_DEFAULT); this.cachePath = props.getProperty(STORAGE_LOCAL_CACHE_PATH, STORAGE_LOCAL_CACHE_PATH_DEFAULT); @@ -307,7 +358,7 @@ public class RocketmqClient { props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT) ); - this.scheduleConsumer = new DefaultMQPushConsumer( + this.scheduleConsumer = new DefaultLitePullConsumer( props.getProperty(STORAGE_ROCKETMQ_CONSUMER_GROUP, STORAGE_ROCKETMQ_CONSUMER_GROUP_DEFAULT) ); @@ -315,6 +366,11 @@ public class RocketmqClient { props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT) ); + this.mqAdminExt = new DefaultMQAdminExt(); + mqAdminExt.setNamesrvAddr( + props.getProperty(STORAGE_ROCKETMQ_NAMESRV, STORAGE_ROCKETMQ_NAMESRV_DEFAULT) + ); + this.converter = new JsonConverterImpl(); } diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java index 7eabe27..ecb3fb6 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq; +import java.util.List; + import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; @@ -72,4 +74,13 @@ public interface RocketmqStorageClient { default SchemaRecordInfo getBySubject(QualifiedName qualifiedName) { throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT); } + + /** + * list all versions of rocketmq schema entity by subject. + * + * @param qualifiedName schema name + */ + default List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) { + throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT); + } } diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java index 8b4752f..2fa82ff 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java @@ -18,8 +18,14 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq; import java.io.File; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext; import org.apache.rocketmq.schema.registry.common.json.JsonConverter; @@ -78,7 +84,7 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient { */ @Override public SchemaInfo getSchema(QualifiedName qualifiedName) { - byte[] result = rocketmqClient.getSchema(qualifiedName); + byte[] result = rocketmqClient.getSchema(qualifiedName.schemaFullName()); return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class); } @@ -89,7 +95,37 @@ public class RocketmqStorageClientImpl implements RocketmqStorageClient { */ @Override public SchemaRecordInfo getBySubject(QualifiedName qualifiedName) { - byte[] result = rocketmqClient.getBySubject(qualifiedName); - return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class); + if (qualifiedName.getVersion() == null) { + byte[] result = rocketmqClient.getBySubject(qualifiedName.subjectFullName()); + return result == null ? null : jsonConverter.fromJson(result, SchemaRecordInfo.class); + } + + // schema version is given + SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName()); + if (schemaInfo == null || schemaInfo.getDetails() == null || schemaInfo.getDetails().getSchemaRecords() == null) { + return null; + } + Map<Long, SchemaRecordInfo> versionSchemaMap = schemaInfo.getDetails().getSchemaRecords() + .stream().collect(Collectors.toMap(SchemaRecordInfo::getVersion, Function.identity())); + return versionSchemaMap.get(qualifiedName.getVersion()); + } + + @Override + public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) { + SchemaInfo schemaInfo = getSchemaInfoBySubject(qualifiedName.subjectFullName()); + if (schemaInfo == null || schemaInfo.getDetails() == null) { + return null; + } + return schemaInfo.getDetails().getSchemaRecords(); + } + + private SchemaInfo getSchemaInfoBySubject(String subjectFullName) { + byte[] lastRecordBytes = rocketmqClient.getBySubject(subjectFullName); + if (lastRecordBytes == null) { + return null; + } + SchemaRecordInfo lastRecord = jsonConverter.fromJson(lastRecordBytes, SchemaRecordInfo.class); + byte[] result = rocketmqClient.getSchema(lastRecord.getSchema()); + return result == null ? null : jsonConverter.fromJson(result, SchemaInfo.class); } } diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java index f15345c..ac0b2e9 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.schema.registry.storage.rocketmq; +import java.util.List; + import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; @@ -77,4 +79,9 @@ public class RocketmqStorageService implements StorageService<SchemaInfo> { public SchemaRecordInfo getBySubject(StorageServiceContext context, QualifiedName name) { return storageClient.getBySubject(name); } + + @Override + public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, QualifiedName name) { + return storageClient.listBySubject(name); + } } diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java index e541364..31e6744 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java @@ -35,7 +35,7 @@ public class RocketmqConfigConstants { public static final String STORAGE_ROCKETMQ_NAMESRV_DEFAULT = "localhost:9876"; public static final String STORAGE_ROCKETMQ_TOPIC = "storage.rocketmq.topic"; - public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "schema_registry_storage"; + public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "RMQ_SYS_schema_registry_storage"; public static final String STORAGE_LOCAL_CACHE_PATH = "storage.local.cache.path"; public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "/tmp/schema-registry/cache"; diff --git a/schema-storage-rocketmq/src/main/resources/rocketmq.properties b/schema-storage-rocketmq/src/main/resources/rocketmq.properties index 5070713..3a94c6a 100644 --- a/schema-storage-rocketmq/src/main/resources/rocketmq.properties +++ b/schema-storage-rocketmq/src/main/resources/rocketmq.properties @@ -16,4 +16,4 @@ # storage.type=rocketmq -#storage.local.cache.path \ No newline at end of file +storage.local.cache.path=/Users/xyb/app/schema-registry/cache \ No newline at end of file
