This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit bccec3ffeab1648d744c0958e2af5cf20f7f8aff Author: huitong <[email protected]> AuthorDate: Wed Jul 20 16:42:47 2022 +0800 optimize controller --- .../schema/registry/common/QualifiedName.java | 8 +- .../schema/registry/common/dto/SubjectDto.java | 3 + .../schema/registry/common/model/SubjectInfo.java | 5 +- .../schema/registry/common/utils/CommonUtil.java | 7 +- .../registry/core/api/v1/SchemaController.java | 135 +++++++++++++++++++-- .../registry/core/service/SchemaServiceImpl.java | 29 ++--- .../registry/storage/rocketmq/RocketmqClient.java | 22 +++- 7 files changed, 169 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java index f97c2d0..06b5059 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java @@ -34,10 +34,6 @@ import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; public class QualifiedName implements Serializable { private static final long serialVersionUID = 2266514833942841209L; - public static final String DEFAULT_TENANT = "default"; - - public static final String DEFAULT_CLUSTER = "cluster"; - private String cluster; private String tenant; private String subject; @@ -71,7 +67,7 @@ public class QualifiedName implements Serializable { } public SubjectInfo subjectInfo() { - return new SubjectInfo(cluster, subject); + return new SubjectInfo(cluster, tenant, subject); } public String fullName() { @@ -79,7 +75,7 @@ public class QualifiedName implements Serializable { } public String schemaFullName() { - return schema; + return tenant + '/' + schema; } public String subjectFullName() { diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java index 2ff3ad2..4d19d5f 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/SubjectDto.java @@ -35,6 +35,9 @@ public class SubjectDto { @ApiModelProperty(value = "Cluster of this subject", required = true) private String cluster; + @ApiModelProperty(value = "Tenant of this subject", required = true) + private String tenant; + @ApiModelProperty(value = "Name of this subject", required = true) private String subject; } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java index 9790f13..cc61275 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java @@ -33,10 +33,11 @@ public class SubjectInfo implements Serializable { private static final long serialVersionUID = -92808722007777844L; private String cluster; + private String tenant; private String subject; public String fullName() { - return cluster + '/' + subject; + return cluster + '/' + tenant + '/' + subject; } @Override @@ -44,6 +45,8 @@ public class SubjectInfo implements Serializable { final StringBuilder sb = new StringBuilder("{"); sb.append("\"cluster\":\"") .append(cluster).append('\"'); + sb.append("\"tenant\":\"") + .append(tenant).append('\"'); sb.append(",\"subject\":\"") .append(subject).append('\"'); sb.append('}'); diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java index ddc2731..a29d6e6 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/utils/CommonUtil.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.schema.registry.common.utils; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.io.BufferedReader; import java.io.File; @@ -60,9 +61,9 @@ import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; public class CommonUtil { public static void validateName(QualifiedName qualifiedName) { -// Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null"); -// Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null"); -// Preconditions.checkNotNull(qualifiedName.getName(), "Schema name is null"); + Preconditions.checkNotNull(qualifiedName.getTenant(), "Tenant is null"); + Preconditions.checkNotNull(qualifiedName.getSubject(), "Subject is null"); + Preconditions.checkNotNull(qualifiedName.getSchema(), "Schema name is null"); } public static boolean isQualifiedNameEmpty(QualifiedName qualifiedName) { diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java index 335294c..558d652 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java @@ -46,11 +46,13 @@ import org.springframework.web.bind.annotation.RestController; @Slf4j public class SchemaController { - private final String cluster; - private final String tenant; private final RequestProcessor requestProcessor; private final SchemaService<SchemaDto> schemaService; + public static final String DEFAULT_TENANT = "default"; + + public static final String DEFAULT_CLUSTER = "default"; + /** * Constructor. * @@ -62,8 +64,6 @@ public class SchemaController { final RequestProcessor requestProcessor, final SchemaService<SchemaDto> schemaService ) { - this.cluster = QualifiedName.DEFAULT_CLUSTER; - this.tenant = QualifiedName.DEFAULT_TENANT; this.requestProcessor = requestProcessor; this.schemaService = schemaService; } @@ -97,6 +97,43 @@ public class SchemaController { @PathVariable("schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) @RequestBody final SchemaDto schemaDto + ) { + return registerSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto); + } + + @RequestMapping( + method = RequestMethod.POST, + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}", + consumes = MediaType.APPLICATION_JSON_VALUE + ) + @ResponseStatus(HttpStatus.CREATED) + @ApiOperation( + value = "Register a new schema", + notes = "Return success if there were no errors registering the schema" + ) + @ApiResponses( + { + @ApiResponse( + code = HttpURLConnection.HTTP_CREATED, + message = "The schema was registered" + ), + @ApiResponse( + code = HttpURLConnection.HTTP_NOT_FOUND, + message = "The requested schema cannot be registered" + ) + } + ) + public SchemaDto registerSchema( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, + @ApiParam(value = "The subject of the schema", required = true) + @PathVariable(name = "subject-name") final String subject, + @ApiParam(value = "The name of the schema", required = true) + @PathVariable("schema-name") final String schemaName, + @ApiParam(value = "The schema detail", required = true) + @RequestBody final SchemaDto schemaDto ) { // TODO: support register by sql final QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName); @@ -112,7 +149,7 @@ public class SchemaController { } @RequestMapping( - path = "/subject/{subject-name}/schema", + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema", method = RequestMethod.DELETE ) @ResponseStatus(HttpStatus.OK) @@ -133,6 +170,10 @@ public class SchemaController { } ) public SchemaDto deleteSchema( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, @ApiParam(value = "The subject of the schema", required = true) @PathVariable("subject-name") final String subject ) { @@ -145,7 +186,7 @@ public class SchemaController { } @RequestMapping( - path = "/subject/{subject-name}/schema/versions/{version}", + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}", method = RequestMethod.DELETE ) @ResponseStatus(HttpStatus.OK) @@ -166,6 +207,10 @@ public class SchemaController { } ) public SchemaDto deleteSchema( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, @ApiParam(value = "The subject of the schema", required = true) @PathVariable("subject-name") final String subject, @ApiParam(value = "The version of the schema", required = true) @@ -208,6 +253,42 @@ public class SchemaController { @PathVariable("schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) @RequestBody final SchemaDto schemaDto + ) { + return updateSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto); + } + + @RequestMapping( + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}", + method = RequestMethod.PUT, + consumes = MediaType.APPLICATION_JSON_VALUE + ) + @ApiOperation( + value = "Update schema and generate new schema version", + notes = "Update the given schema" + ) + @ApiResponses( + { + @ApiResponse( + code = HttpURLConnection.HTTP_OK, + message = "Update schema success" + ), + @ApiResponse( + code = HttpURLConnection.HTTP_NOT_FOUND, + message = "The requested schema cannot be found" + ) + } + ) + public SchemaDto updateSchema( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, + @ApiParam(value = "The subject of the schema", required = true) + @PathVariable(name = "subject-name") final String subject, + @ApiParam(value = "The name of the schema", required = true) + @PathVariable("schema-name") final String schemaName, + @ApiParam(value = "The schema detail", required = true) + @RequestBody final SchemaDto schemaDto ) { QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName); return this.requestProcessor.processRequest( @@ -240,6 +321,36 @@ public class SchemaController { public SchemaRecordDto getSchemaBySubject( @ApiParam(value = "The name of the subject", required = true) @PathVariable("subject-name") String subject + ) { + return getSchemaBySubject(DEFAULT_CLUSTER, DEFAULT_CLUSTER, subject); + } + + @RequestMapping( + method = RequestMethod.GET, + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema" + ) + @ApiOperation( + value = "Schema information", + notes = "Schema information with the latest version under the subject") + @ApiResponses( + { + @ApiResponse( + code = HttpURLConnection.HTTP_OK, + message = "The schema is returned" + ), + @ApiResponse( + code = HttpURLConnection.HTTP_NOT_FOUND, + message = "The requested tenant or schema cannot be found" + ) + } + ) + public SchemaRecordDto getSchemaBySubject( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, + @ApiParam(value = "The name of the subject", required = true) + @PathVariable("subject-name") String subject ) { QualifiedName name = new QualifiedName(cluster, tenant, subject, null); log.info("Request for get schema for subject: {}", name.subjectFullName()); @@ -252,7 +363,7 @@ public class SchemaController { @RequestMapping( method = RequestMethod.GET, - path = "/subject/{subject-name}/schema/versions/{version}" + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}" ) @ApiOperation( value = "Schema information", @@ -270,6 +381,10 @@ public class SchemaController { } ) public SchemaRecordDto getSchemaBySubject( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, @ApiParam(value = "The name of the subject", required = true) @PathVariable("subject-name") String subject, @ApiParam(value = "The version of the schema", required = true) @@ -285,7 +400,7 @@ public class SchemaController { @RequestMapping( method = RequestMethod.GET, - path = "/subject/{subject-name}/schema/versions" + path = "/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions" ) @ApiOperation( value = "Schema information", @@ -303,6 +418,10 @@ public class SchemaController { } ) public List<SchemaRecordDto> getSchemaListBySubject( + @ApiParam(value = "The cluster of the subject", required = true) + @PathVariable(value = "cluster-name") final String cluster, + @ApiParam(value = "The tenant of the schema", required = true) + @PathVariable(value = "tenant-name") final String tenant, @ApiParam(value = "The name of the subject", required = true) @PathVariable("subject-name") String subject ) { diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java index 5f72d34..9d368b9 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java @@ -17,34 +17,33 @@ package org.apache.rocketmq.schema.registry.core.service; -import com.google.common.base.Strings; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; +import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; -import org.apache.rocketmq.schema.registry.common.context.RequestContext; import org.apache.rocketmq.schema.registry.common.auth.AccessControlService; +import org.apache.rocketmq.schema.registry.common.context.RequestContext; +import org.apache.rocketmq.schema.registry.common.context.RequestContextManager; +import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; +import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException; import org.apache.rocketmq.schema.registry.common.exception.SchemaException; import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException; -import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException; +import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException; import org.apache.rocketmq.schema.registry.common.model.Dependency; +import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; +import org.apache.rocketmq.schema.registry.common.model.SchemaOperation; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; import org.apache.rocketmq.schema.registry.common.properties.GlobalConfig; -import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; -import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; -import org.apache.rocketmq.schema.registry.common.model.SchemaOperation; -import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException; -import org.apache.rocketmq.schema.registry.common.context.RequestContextManager; -import org.apache.rocketmq.schema.registry.common.utils.IdGenerator; -import org.apache.rocketmq.schema.registry.core.dependency.DependencyService; import org.apache.rocketmq.schema.registry.common.storage.StorageServiceProxy; import org.apache.rocketmq.schema.registry.common.utils.CommonUtil; +import org.apache.rocketmq.schema.registry.common.utils.IdGenerator; import org.apache.rocketmq.schema.registry.common.utils.StorageUtil; +import org.apache.rocketmq.schema.registry.core.dependency.DependencyService; @Slf4j public class SchemaServiceImpl implements SchemaService<SchemaDto> { @@ -83,6 +82,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { final RequestContext requestContext = RequestContextManager.getContext(); log.info("register get request context: " + requestContext); + schemaDto.setQualifiedName(qualifiedName); checkSchemaValid(schemaDto); checkSchemaExist(qualifiedName); @@ -120,6 +120,8 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { final RequestContext requestContext = RequestContextManager.getContext(); log.info("update get request context: " + requestContext); + schemaDto.setQualifiedName(qualifiedName); + this.accessController.checkPermission("", "", SchemaOperation.UPDATE); SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled()); @@ -156,14 +158,9 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { } if (update.getAudit() == null) { - // todo update.setAudit(current.getAudit()); } - if (update.getQualifiedName() == null) { - update.setQualifiedName(current.getQualifiedName()); - } - // checkSchemaValid(schemaDto); CommonUtil.validateCompatibility(update, current, current.getMeta().getCompatibility()); diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index 913c360..5af299d 100644 --- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -103,8 +103,8 @@ public class RocketmqClient { public RocketmqClient(Properties props) { init(props); createStorageTopic(); - startRemoteStorage(); startLocalCache(); + startRemoteStorage(); } private void createStorageTopic() { @@ -198,17 +198,25 @@ public class RocketmqClient { @Override public void run() { - List<MessageExt> msgList = scheduleConsumer.poll(1000); - if (CollectionUtils.isNotEmpty(msgList)) { - msgList.forEach(this::consumeMessage); + try { + List<MessageExt> msgList = scheduleConsumer.poll(1000); + if (CollectionUtils.isNotEmpty(msgList)) { + msgList.forEach(this::consumeMessage); + } + scheduleConsumer.commitSync(); + } catch (Exception e) { + log.error("consume message exception, consume offset may not commit"); } } private void consumeMessage(MessageExt msg) { + if (msg.getKeys() == null) { + return; + } synchronized (this) { try { log.info("receive msg, the content is {}", new String(msg.getBody())); - if (DELETE_KEYS.equals(msg.getKeys())) { + if (msg.getKeys().equals(DELETE_KEYS)) { // delete byte[] schemaFullName = msg.getBody(); byte[] schemaInfoBytes = cache.get(schemaCfHandle(), schemaFullName); @@ -230,6 +238,7 @@ public class RocketmqClient { } else { SchemaInfo current = converter.fromJson(result, SchemaInfo.class); if (current.getLastRecordVersion() == update.getLastRecordVersion()) { + log.info("Schema version is the same, no need to update."); return; } if (current.getLastRecordVersion() > update.getLastRecordVersion()) { @@ -248,7 +257,8 @@ public class RocketmqClient { } } } catch (Throwable e) { - throw new SchemaException("Rebuild schema cache failed", e); + log.error("Update schema cache failed, msg {}", new String(msg.getBody()), e); + throw new SchemaException("Update schema " + msg.getKeys() + " failed.", e); } } }
