This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 2d6c32ae1b73050a0492b70fe7e85f64c3fd9395 Author: wangfan <[email protected]> AuthorDate: Sat Jul 30 19:29:58 2022 +0800 simplify-register-update-api --- .gitignore | 1 + README.md | 40 ++++----- common/pom.xml | 2 +- .../schema/registry/common/dto/BaseDto.java | 29 +------ .../RegisterSchemaRequest.java} | 32 +++++--- .../RegisterSchemaResponse.java} | 20 +++-- .../UpdateSchemaRequest.java} | 23 +++--- .../UpdateSchemaResponse.java} | 20 +++-- .../registry/common/json/JsonConverterImpl.java | 3 +- .../schema/registry/common/model/AuditInfo.java | 12 +++ .../registry/common/model/SchemaDetailInfo.java | 7 +- .../schema/registry/common/model/SchemaInfo.java | 21 ++++- .../registry/common/model/SchemaRecordInfo.java | 7 +- .../registry/common/model/SchemaStorageInfo.java | 1 - core/pom.xml | 16 ++-- .../registry/core/api/v1/SchemaController.java | 29 ++++--- .../registry/core/dependency/DependencyHelper.java | 8 ++ .../registry/core/service/SchemaService.java | 11 +-- .../registry/core/service/SchemaServiceImpl.java | 94 ++++++++++++---------- core/src/main/resources/application.properties | 2 +- pom.xml | 2 +- storage-rocketmq/pom.xml | 2 +- .../registry/storage/rocketmq/RocketmqClient.java | 2 + .../rocketmq/configs/RocketmqConfigConstants.java | 2 +- .../src/main/resources/rocketmq.properties | 2 +- war/pom.xml | 2 +- 26 files changed, 213 insertions(+), 177 deletions(-) diff --git a/.gitignore b/.gitignore index fc92691..1d712aa 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ target/ log/ log/* tools/ +schema-registry/ ### STS ### .apt_generated diff --git a/README.md b/README.md index a8a515f..6a08db8 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,9 @@ Getting started #### Installation ```shell -git clone [email protected]:apache/rocketmq-schema-registry.git -cd rocketmq-schema-registry -./mvnw clean package +$ git clone [email protected]:apache/rocketmq-schema-registry.git +$ cd rocketmq-schema-registry +$ mvn clean package ``` #### Prepare storage layer @@ -41,18 +41,18 @@ $ nohup sh mqnamesrv & $ nohup sh bin/mqbroker -n localhost:9876 & ``` -#### Edit configuration +#### Edit configuration (Optional) * Config storage local cache path ```shell -$ storage.local.cache.path="" >> schema-storage-rocketmq/src/main/resources +$ echo "storage.local.cache.path=${user.dir}" >> storage-rocketmq/src/main/resources/rocketmq.properties ``` #### Deployment & Running locally -Take the build JAR in target/build/schema-register.jar and run `java -cp schema-register.jar` to start service. +Take the build JAR in core/target/ and run `java -jar rocketmq-schema-registry-core-0.0.3-SNAPSHOT.jar` to start service. -The REST API can be accessed from http://localhost:8080/schema-registry/v1 +Then REST API can be accessed from http://localhost:8080/schema-registry/v1 Swagger API documentation can be accessed from http://localhost:8080/swagger-ui/index.html @@ -80,31 +80,31 @@ API Reference ```shell # Register new schema on specified subject with default cluster and tenant -$ curl -X post -H "Content-Type: application/json" \ ---data '{"details":{"schemaRecords":[{"idl":"{\"type\":\"record\",\"name\":\"Demo\",\"namespace\":\"com.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}"}]}, "meta":{"compatibility":"BACKWARD","namespace":"com.schema.example","owner":"test","schemaName":"Demo","tenant":"default","type":"AVRO"}}' \ -http://localhost:8080/schema-registry/v1/subject/{subject-name}/schema/{schema-name} +$ curl -X POST -H "Content-Type: application/json" \ +-d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"SchemaName\",\"namespace\":\"rocketmq.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}' \ +http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema/SchemaName # Register new schema with cluster specified cluster and tenant -$ curl -X post -H "Content-Type: application/json" \ ---data '{"details":{"schemaRecords":[{"idl":"{\"type\":\"record\",\"name\":\"Demo\",\"namespace\":\"com.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"}]}"}]}, "meta":{"compatibility":"BACKWARD","namespace":"com.schema.example","owner":"test","schemaName":"Demo","tenant":"default","type":"AVRO"}}' \ -http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name} +$ curl -X POST -H "Content-Type: application/json" \ +-d '{"schema": "{\"type\": \"string\"}"}' \ +http://localhost:8080/schema-registry/v1/cluster/default/tenant/default/subject/RMQTopic2/schema/Text # Delete schema all version -$ curl -X delete http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema +$ curl -X DELETE http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema # Update schema and generate a new version, you can also use default cluster and tenant like register interface -$ curl -X put -H "Content-Type: application/json" \ ---data '{"details":{"schemaRecords":[{"idl":"{\"type\":\"record\",\"name\":\"Demo\",\"namespace\":\"com.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"type\",\"type\":\"string\"}]}"}]}, "meta":{"compatibility":"BACKWARD","namespace":"com.schema.example","owner":"test","schemaName":"Demo","tenant":"default","type":"AVRO"}}' \ -http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name} +$ curl -X PUT -H "Content-Type: application/json" \ +-d '{"schemaIdl":"{\"type\":\"record\",\"name\":\"SchemaName\",\"namespace\":\"rocketmq.schema.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\",\"default\":\"0\"}]}"}' \ +http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema/SchemaName # Get binding schema version by subject with specified cluster and tenant, , you can also use default cluster and tenant like register interface -$ curl -X GET http://localhost:8081/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema +$ curl -X GET http://localhost:8080/schema-registry/v1/subject/RMQTopic/schema # Get schema record by specified version -$ curl -X GET http://localhost:8081/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version} +$ curl -X GET http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version} # Get all schema record -$ curl -X GET http://localhost:8081/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions +$ curl -X GET http://localhost:8080/schema-registry/v1/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions ``` Contribute diff --git a/common/pom.xml b/common/pom.xml index 5e8883a..7bcfc08 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -22,7 +22,7 @@ <parent> <artifactId>rocketmq-schema-registry-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>0.0.2-SNAPSHOT</version> + <version>0.0.3-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java index e7445ac..0e11939 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/BaseDto.java @@ -38,32 +38,5 @@ public abstract class BaseDto implements Serializable { public String toString() { return JSON_CONVERTER.toString(this); } -// -// /** -// * Deserialize data from the input stream. -// * -// * @param inputStream input stream -// * @return Json ObjectNode -// * @throws IOException exception deserializing the stream -// */ -// @Nullable -// public static ObjectNode deserializeObjectNode( -// @Nonnull @NonNull final ObjectInputStream inputStream -// ) throws IOException { -// return JSON_CONVERTER.deserializeObjectNode(inputStream); -// } -// -// /** -// * Serialize data in the output stream. -// * -// * @param outputStream output stream -// * @param jsonObject jsonObject -// * @throws IOException exception serializing the json -// */ -// public static void serializeObjectNode( -// @Nonnull @NonNull final ObjectOutputStream outputStream, -// @Nullable final ObjectNode jsonObject -// ) throws IOException { -// JSON_CONVERTER.serializeObjectNode(outputStream, jsonObject); -// } + } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaRequest.java similarity index 55% copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java copy to common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaRequest.java index 491924e..6e1e514 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaRequest.java @@ -15,28 +15,38 @@ * limitations under the License. */ -package org.apache.rocketmq.schema.registry.common.model; +package org.apache.rocketmq.schema.registry.common.dto; import io.swagger.annotations.ApiModelProperty; -import java.io.Serializable; -import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.rocketmq.schema.registry.common.model.Compatibility; +import org.apache.rocketmq.schema.registry.common.model.SchemaType; @Data @EqualsAndHashCode(callSuper = false) @Builder @NoArgsConstructor @AllArgsConstructor -public class SchemaStorageInfo implements Serializable { - private static final long serialVersionUID = -6655281552098217740L; - - private String serdeProtocol; - // TODO delete? - private String serializationLib; - private Map<String, String> serdeInfo; - private String uri; +public class RegisterSchemaRequest extends BaseDto { + private static final long serialVersionUID = 7890248374919863930L; + + @ApiModelProperty(value = "First IDL of this schema", example = "{\"type\": \"string\"}", required = true) + private String schemaIdl; + + @ApiModelProperty(value = "Schema type") + private SchemaType schemaType = SchemaType.AVRO; + + @ApiModelProperty(value = "Schema owner", example = "li") + private String owner = ""; + + @ApiModelProperty(value = "Schema compatibility") + private Compatibility compatibility = Compatibility.BACKWARD; + + @ApiModelProperty(value = "Schema description", example = "my first schema") + private String desc = ""; + } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaResponse.java similarity index 73% copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java copy to common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaResponse.java index 66cbd5e..5e82c10 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/RegisterSchemaResponse.java @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.rocketmq.schema.registry.common.model; +package org.apache.rocketmq.schema.registry.common.dto; import io.swagger.annotations.ApiModelProperty; -import java.io.Serializable; -import java.util.Date; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -29,14 +27,14 @@ import lombok.NoArgsConstructor; @Data @EqualsAndHashCode(callSuper = false) @Builder -@AllArgsConstructor @NoArgsConstructor -public class AuditInfo implements Serializable { - private static final long serialVersionUID = 2258089775496856662L; +@AllArgsConstructor +public class RegisterSchemaResponse extends BaseDto { + private static final long serialVersionUID = 5961720146684473323L; + + @ApiModelProperty(value = "Schema unique id", required = true) + private long schemaId; - private String desc; - private String createdBy; - private Date createdTime; - private String lastModifiedBy; - private Date lastModifiedTime; + @ApiModelProperty(value = "Version of this schema record") + private long version; } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaRequest.java similarity index 62% copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java copy to common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaRequest.java index 491924e..7d22f19 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaRequest.java @@ -15,28 +15,31 @@ * limitations under the License. */ -package org.apache.rocketmq.schema.registry.common.model; +package org.apache.rocketmq.schema.registry.common.dto; import io.swagger.annotations.ApiModelProperty; -import java.io.Serializable; -import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.rocketmq.schema.registry.common.model.Compatibility; +import org.apache.rocketmq.schema.registry.common.model.SchemaType; @Data @EqualsAndHashCode(callSuper = false) @Builder @NoArgsConstructor @AllArgsConstructor -public class SchemaStorageInfo implements Serializable { - private static final long serialVersionUID = -6655281552098217740L; +public class UpdateSchemaRequest extends BaseDto { + private static final long serialVersionUID = 2966846398643703675L; - private String serdeProtocol; - // TODO delete? - private String serializationLib; - private Map<String, String> serdeInfo; - private String uri; + @ApiModelProperty(value = "Update IDL of this schema", example = "{\"type\": \"int\"}", required = true) + private String schemaIdl; + + @ApiModelProperty(value = "Schema owner", example = "li") + private String owner = ""; + + @ApiModelProperty(value = "Schema description", example = "update schema") + private String desc = ""; } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaResponse.java similarity index 73% copy from common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java copy to common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaResponse.java index 66cbd5e..cb3f54f 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/dto/UpdateSchemaResponse.java @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.rocketmq.schema.registry.common.model; +package org.apache.rocketmq.schema.registry.common.dto; import io.swagger.annotations.ApiModelProperty; -import java.io.Serializable; -import java.util.Date; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -29,14 +27,14 @@ import lombok.NoArgsConstructor; @Data @EqualsAndHashCode(callSuper = false) @Builder -@AllArgsConstructor @NoArgsConstructor -public class AuditInfo implements Serializable { - private static final long serialVersionUID = 2258089775496856662L; +@AllArgsConstructor +public class UpdateSchemaResponse extends BaseDto { + private static final long serialVersionUID = 822296169833367618L; + + @ApiModelProperty(value = "Schema unique id", required = true) + private long schemaId; - private String desc; - private String createdBy; - private Date createdTime; - private String lastModifiedBy; - private Date lastModifiedTime; + @ApiModelProperty(value = "Version of this schema record") + private long version; } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java index 03b1f1f..c3e3f89 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/json/JsonConverterImpl.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.schema.registry.common.json; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.nio.charset.StandardCharsets; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -89,7 +90,7 @@ public class JsonConverterImpl implements JsonConverter { @Override public String toString(Object o) { - return null; + return gson.toJson(o); } @Override diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java index 66cbd5e..7f7b274 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/AuditInfo.java @@ -39,4 +39,16 @@ public class AuditInfo implements Serializable { private Date createdTime; private String lastModifiedBy; private Date lastModifiedTime; + + public void createBy(String user, String desc) { + this.desc = desc; + this.createdBy = user; + this.lastModifiedBy = user; + this.createdTime = this.lastModifiedTime = new Date(System.currentTimeMillis()); + } + + public void updateBy(String user) { + this.lastModifiedBy = user; + this.lastModifiedTime = new Date(System.currentTimeMillis()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java index aa93dbf..2f1c820 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDetailInfo.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.schema.registry.common.model; import com.fasterxml.jackson.annotation.JsonProperty; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import lombok.AllArgsConstructor; import lombok.Builder; @@ -34,7 +35,11 @@ import org.apache.rocketmq.schema.registry.common.exception.SchemaException; public class SchemaDetailInfo implements Serializable { private static final long serialVersionUID = 3113021009662503334L; - private List<SchemaRecordInfo> schemaRecords; + private List<SchemaRecordInfo> schemaRecords = new ArrayList<>(); + + public SchemaDetailInfo(SchemaRecordInfo firstSchemaRecord) { + this.schemaRecords.add(firstSchemaRecord); + } public SchemaRecordInfo lastRecord() { if (schemaRecords == null || schemaRecords.isEmpty()) { diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java index 6fc1ecf..1ff8489 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaInfo.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.schema.registry.common.model; +import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.rocketmq.schema.registry.common.QualifiedName; @Data @Builder @@ -32,13 +34,23 @@ import lombok.NoArgsConstructor; public class SchemaInfo extends BaseInfo { private static final long serialVersionUID = -5143258312429353896L; - private SchemaMetaInfo meta; + private SchemaMetaInfo meta = new SchemaMetaInfo(); - private SchemaDetailInfo details; + private SchemaDetailInfo details = new SchemaDetailInfo(); - private SchemaStorageInfo storage; + private SchemaStorageInfo storage = new SchemaStorageInfo(); - private Map<String, String> extras; + private Map<String, String> extras = new HashMap<>(); + + public SchemaInfo(final QualifiedName qualifiedName, + final AuditInfo audit, + final SchemaMetaInfo meta, + final SchemaDetailInfo details + ) { + super(qualifiedName, audit); + this.meta = meta; + this.details = details; + } public String getSchemaName() { return getQualifiedName().getSchema(); @@ -77,6 +89,7 @@ public class SchemaInfo extends BaseInfo { return getLastRecord().getVersion(); } + @Deprecated public void setLastRecordVersion(long version) { getLastRecord().setVersion(version); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java index 02af194..0cfe3a0 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaRecordInfo.java @@ -33,17 +33,14 @@ public class SchemaRecordInfo implements Serializable { private String schema; private long schemaId; - private long version; + private long version = 1L; private String idl; private Dependency dependency; - private List<SubjectInfo> subjects; + private List<SubjectInfo> subjects = new ArrayList<>(); private SchemaType type; // private List<FieldInfo> fields; public void bindSubject(final SubjectInfo subjectInfo) { - if (getSubjects() == null) { - setSubjects(new ArrayList<>()); - } getSubjects().add(subjectInfo); } diff --git a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java index 491924e..e85d44c 100644 --- a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java +++ b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaStorageInfo.java @@ -35,7 +35,6 @@ public class SchemaStorageInfo implements Serializable { private static final long serialVersionUID = -6655281552098217740L; private String serdeProtocol; - // TODO delete? private String serializationLib; private Map<String, String> serdeInfo; private String uri; diff --git a/core/pom.xml b/core/pom.xml index 0b6ae01..b6f4ac6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -5,7 +5,7 @@ <parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-schema-registry-all</artifactId> - <version>0.0.2-SNAPSHOT</version> + <version>0.0.3-SNAPSHOT</version> </parent> <packaging>jar</packaging> @@ -22,7 +22,7 @@ <dependency> <groupId>org.apache.rocketmq</groupId> - <artifactId>schema-storage-rocketmq</artifactId> + <artifactId>storage-rocketmq</artifactId> <version>${project.version}</version> </dependency> </dependencies> @@ -70,12 +70,12 @@ <include>**/*.*</include> </includes> </resource> - <resource> - <directory>../storage-rocketmq/src/main/resources</directory> - <includes> - <include>**/*.*</include> - </includes> - </resource> +<!-- <resource>--> +<!-- <directory>../storage-rocketmq/src/main/resources</directory>--> +<!-- <includes>--> +<!-- <include>**/*.*</include>--> +<!-- </includes>--> +<!-- </resource>--> </resources> </build> diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java index 97fe8bb..01e5501 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java @@ -16,8 +16,12 @@ import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.schema.registry.common.QualifiedName; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse; import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse; import org.apache.rocketmq.schema.registry.core.api.RequestProcessor; import org.apache.rocketmq.schema.registry.core.service.SchemaService; import org.springframework.beans.factory.annotation.Autowired; @@ -90,15 +94,15 @@ public class SchemaController { ) } ) - public SchemaDto registerSchema( + public RegisterSchemaResponse registerSchema( @ApiParam(value = "The subject of the schema", required = true) @PathVariable(value = "subject-name") final String subject, @ApiParam(value = "The name of the schema", required = true) @PathVariable(value = "schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) - @RequestBody final SchemaDto schemaDto + @RequestBody final RegisterSchemaRequest registerSchemaRequest ) { - return registerSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto); + return registerSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, registerSchemaRequest); } @RequestMapping( @@ -123,7 +127,7 @@ public class SchemaController { ) } ) - public SchemaDto registerSchema( + public RegisterSchemaResponse registerSchema( @ApiParam(value = "The cluster of the subject", required = true) @PathVariable(value = "cluster-name") final String cluster, @ApiParam(value = "The tenant of the schema", required = true) @@ -133,17 +137,16 @@ public class SchemaController { @ApiParam(value = "The name of the schema", required = true) @PathVariable(value = "schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) - @RequestBody final SchemaDto schemaDto + @RequestBody final RegisterSchemaRequest registerSchemaDto ) { // TODO: support register by sql final QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName); - schemaDto.setQualifiedName(name); return this.requestProcessor.processRequest( name, "register", () -> { - return this.schemaService.register(name, schemaDto); + return this.schemaService.register(name, registerSchemaDto); } ); } @@ -246,15 +249,15 @@ public class SchemaController { ) } ) - public SchemaDto updateSchema( + public UpdateSchemaResponse updateSchema( @ApiParam(value = "The subject of the schema", required = true) @PathVariable(value = "subject-name") final String subject, @ApiParam(value = "The name of the schema", required = true) @PathVariable(value = "schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) - @RequestBody final SchemaDto schemaDto + @RequestBody final UpdateSchemaRequest updateSchemaRequest ) { - return updateSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, schemaDto); + return updateSchema(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, schemaName, updateSchemaRequest); } @RequestMapping( @@ -278,7 +281,7 @@ public class SchemaController { ) } ) - public SchemaDto updateSchema( + public UpdateSchemaResponse updateSchema( @ApiParam(value = "The cluster of the subject", required = true) @PathVariable(value = "cluster-name") final String cluster, @ApiParam(value = "The tenant of the schema", required = true) @@ -288,13 +291,13 @@ public class SchemaController { @ApiParam(value = "The name of the schema", required = true) @PathVariable(value = "schema-name") final String schemaName, @ApiParam(value = "The schema detail", required = true) - @RequestBody final SchemaDto schemaDto + @RequestBody final UpdateSchemaRequest updateSchemaRequest ) { QualifiedName name = new QualifiedName(cluster, tenant, subject, schemaName); return this.requestProcessor.processRequest( name, "updateSchema", - () -> this.schemaService.update(name, schemaDto) + () -> this.schemaService.update(name, updateSchemaRequest) ); } diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java index f095476..1c4bab2 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/dependency/DependencyHelper.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.schema.registry.core.dependency; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -64,6 +66,12 @@ public class DependencyHelper { public DependencyHelper(final String jdkPath, final String parentDir, final SchemaInfo schemaInfo) { this.schemaName = schemaInfo.getSchemaName(); + JsonObject object = JsonParser.parseString(schemaInfo.getLastRecordIdl()).getAsJsonObject(); + if (object.get("namespace") == null) { + throw new SchemaException("namespace field should not be empty"); + } + + schemaInfo.getMeta().setNamespace(object.get("namespace").getAsString()); this.dependency = Dependency.builder() .groupId(schemaInfo.getNamespace()) .artifactId(schemaInfo.getSchemaName()) diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java index 84c70cc..a468595 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java @@ -18,12 +18,13 @@ package org.apache.rocketmq.schema.registry.core.service; import java.util.List; -import java.util.Optional; import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.dto.BaseDto; -import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse; import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; -import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse; public interface SchemaService<T extends BaseDto> { @@ -34,7 +35,7 @@ public interface SchemaService<T extends BaseDto> { * @param dto register resource information * @return registered schema object */ - T register(QualifiedName qualifiedName, T dto); + RegisterSchemaResponse register(QualifiedName qualifiedName, RegisterSchemaRequest dto); /** * Register the schema. @@ -43,7 +44,7 @@ public interface SchemaService<T extends BaseDto> { * @param dto update information * @return updated schema object */ - T update(QualifiedName qualifiedName, T dto); + UpdateSchemaResponse update(QualifiedName qualifiedName, UpdateSchemaRequest dto); /** * Deletes the schema. diff --git a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java index 9d368b9..f06ec76 100644 --- a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java +++ b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java @@ -27,14 +27,21 @@ import org.apache.rocketmq.schema.registry.common.QualifiedName; import org.apache.rocketmq.schema.registry.common.auth.AccessControlService; import org.apache.rocketmq.schema.registry.common.context.RequestContext; import org.apache.rocketmq.schema.registry.common.context.RequestContextManager; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse; import org.apache.rocketmq.schema.registry.common.dto.SchemaDto; import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest; +import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse; import org.apache.rocketmq.schema.registry.common.exception.SchemaCompatibilityException; import org.apache.rocketmq.schema.registry.common.exception.SchemaException; import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException; import org.apache.rocketmq.schema.registry.common.exception.SchemaNotFoundException; +import org.apache.rocketmq.schema.registry.common.model.AuditInfo; import org.apache.rocketmq.schema.registry.common.model.Dependency; +import org.apache.rocketmq.schema.registry.common.model.SchemaDetailInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; +import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaOperation; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; @@ -78,28 +85,36 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { * {@inheritDoc} */ @Override - public SchemaDto register(QualifiedName qualifiedName, SchemaDto schemaDto) { + public RegisterSchemaResponse register(QualifiedName qualifiedName, RegisterSchemaRequest registerSchemaDto) { final RequestContext requestContext = RequestContextManager.getContext(); log.info("register get request context: " + requestContext); - schemaDto.setQualifiedName(qualifiedName); - checkSchemaValid(schemaDto); - checkSchemaExist(qualifiedName); - // TODO: add user and ak sk accessController.checkPermission("", qualifiedName.getTenant(), SchemaOperation.REGISTER); - SchemaInfo current = storageServiceProxy.get(qualifiedName, config.isCacheEnabled()); - if (current != null) { - throw new SchemaExistException(qualifiedName); - } + checkSchemaExist(qualifiedName); + + final AuditInfo audit = new AuditInfo(); + audit.createBy(registerSchemaDto.getOwner(), registerSchemaDto.getDesc()); + + long schemaId = idGenerator.nextId(); + final SchemaMetaInfo meta = new SchemaMetaInfo(); + meta.setCompatibility(registerSchemaDto.getCompatibility()); + meta.setOwner(registerSchemaDto.getOwner()); + meta.setType(registerSchemaDto.getSchemaType()); + meta.setSchemaName(qualifiedName.getSchema()); + meta.setTenant(qualifiedName.getTenant()); + meta.setUniqueId(schemaId); - SchemaInfo schemaInfo = storageUtil.convertFromSchemaDto(schemaDto); - schemaInfo.setUniqueId(idGenerator.nextId()); - schemaInfo.setLastRecordVersion(1L); - schemaInfo.getLastRecord().setSchema(qualifiedName.schemaFullName()); - schemaInfo.getLastRecord().setType(schemaInfo.getMeta().getType()); - schemaInfo.getLastRecord().bindSubject(qualifiedName.subjectInfo()); + final SchemaRecordInfo firstRecord = new SchemaRecordInfo(); + firstRecord.setSchema(qualifiedName.schemaFullName()); + firstRecord.setSchemaId(schemaId); + firstRecord.setType(registerSchemaDto.getSchemaType()); + firstRecord.setIdl(registerSchemaDto.getSchemaIdl()); + firstRecord.bindSubject(qualifiedName.subjectInfo()); + + final SchemaDetailInfo details = new SchemaDetailInfo(firstRecord); + final SchemaInfo schemaInfo = new SchemaInfo(qualifiedName, audit, meta, details); if (config.isUploadEnabled()) { // TODO: async upload to speed up register operation and keep atomic with register @@ -109,18 +124,16 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { log.info("Creating schema info {}: {}", qualifiedName, schemaInfo); storageServiceProxy.register(qualifiedName, schemaInfo); - return storageUtil.convertToSchemaDto(schemaInfo); + return new RegisterSchemaResponse(schemaInfo.getUniqueId(), schemaInfo.getLastRecordVersion()); } /** * {@inheritDoc} */ @Override - public SchemaDto update(QualifiedName qualifiedName, SchemaDto schemaDto) { + public UpdateSchemaResponse update(QualifiedName qualifiedName, UpdateSchemaRequest updateSchemaRequest) { final RequestContext requestContext = RequestContextManager.getContext(); - log.info("update get request context: " + requestContext); - - schemaDto.setQualifiedName(qualifiedName); + log.info("update request context: " + requestContext); this.accessController.checkPermission("", "", SchemaOperation.UPDATE); @@ -129,39 +142,38 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { throw new SchemaNotFoundException("Schema " + qualifiedName + " not exist, ignored update."); } - SchemaInfo update = storageUtil.convertFromSchemaDto(schemaDto); + current.getAudit().updateBy(updateSchemaRequest.getOwner()); + + final SchemaRecordInfo updateRecord = new SchemaRecordInfo(); + updateRecord.setSchema(qualifiedName.schemaFullName()); + updateRecord.setSchemaId(current.getUniqueId()); + updateRecord.setType(current.getSchemaType()); + updateRecord.setIdl(updateSchemaRequest.getSchemaIdl()); + updateRecord.bindSubject(qualifiedName.subjectInfo()); + updateRecord.setVersion(current.getLastRecordVersion() + 1); - if (update.getDetails() != null) { - SubjectInfo subjectInfo = qualifiedName.subjectInfo(); - SchemaRecordInfo updateRecord = update.getDetails().lastRecord(); - updateRecord.setVersion(current.getLastRecordVersion() + 1); - updateRecord.setSchema(qualifiedName.schemaFullName()); - updateRecord.setSchemaId(current.getUniqueId()); - updateRecord.bindSubject(subjectInfo); - current.getLastRecord().unbindSubject(subjectInfo); + final List<SchemaRecordInfo> updateRecords = new ArrayList<>(current.getDetails().getSchemaRecords()); + updateRecords.add(updateRecord); - List<SchemaRecordInfo> currentRecords = new ArrayList<>(current.getDetails().getSchemaRecords()); - currentRecords.add(updateRecord); - update.getDetails().setSchemaRecords(currentRecords); + final SchemaInfo update = new SchemaInfo(); + update.getDetails().setSchemaRecords(updateRecords); + + if (current.getQualifiedName() != null) { + update.setQualifiedName(current.getQualifiedName()); } - if (update.getMeta() == null) { + if (current.getMeta() != null) { update.setMeta(current.getMeta()); } - if (update.getStorage() == null) { + if (current.getStorage() != null) { update.setStorage(current.getStorage()); } - if (update.getExtras() == null) { + if (update.getExtras() != null) { update.setExtras(current.getExtras()); } - if (update.getAudit() == null) { - update.setAudit(current.getAudit()); - } - -// checkSchemaValid(schemaDto); CommonUtil.validateCompatibility(update, current, current.getMeta().getCompatibility()); if (config.isUploadEnabled()) { @@ -171,7 +183,7 @@ public class SchemaServiceImpl implements SchemaService<SchemaDto> { log.info("Updating schema info {}: {}", qualifiedName, update); storageServiceProxy.update(qualifiedName, update); - return storageUtil.convertToSchemaDto(update); + return new UpdateSchemaResponse(updateRecord.getSchemaId(), updateRecord.getVersion()); } /** diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index 022e3dc..44a196c 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -2,7 +2,7 @@ spring.profiles.active = dev spring.application.name = rocketmq-schema-registry server.port=8080 -logging.file.name = log/app.log +logging.file.name = schema-registry/log/app.log schema.dependency.upload-enabled = false #schema.dependency.jdk-path diff --git a/pom.xml b/pom.xml index 425de31..e955a22 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-schema-registry-all</artifactId> - <version>0.0.2-SNAPSHOT</version> + <version>0.0.3-SNAPSHOT</version> <packaging>pom</packaging> <name>rocketmq-schema-registry-${project.version}</name> <description>rocketmq-schema-registry</description> diff --git a/storage-rocketmq/pom.xml b/storage-rocketmq/pom.xml index b933223..a2a2c17 100644 --- a/storage-rocketmq/pom.xml +++ b/storage-rocketmq/pom.xml @@ -22,7 +22,7 @@ <parent> <artifactId>rocketmq-schema-registry-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>0.0.2-SNAPSHOT</version> + <version>0.0.3-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java index f671a7b..ccd9749 100644 --- a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java +++ b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl; import org.apache.rocketmq.schema.registry.common.model.SchemaInfo; import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo; import org.apache.rocketmq.schema.registry.common.model.SubjectInfo; +import org.apache.rocketmq.schema.registry.common.utils.CommonUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -157,6 +158,7 @@ public class RocketmqClient { private void startLocalCache() { try { + CommonUtil.mkdir(cachePath); List<byte[]> cfs = RocksDB.listColumnFamilies(options, cachePath); if (cfs.size() <= 1) { List<byte[]> columnFamilies = Arrays.asList(STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY, diff --git a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java index 4ec11b2..17e1263 100644 --- a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java +++ b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/RocketmqConfigConstants.java @@ -38,7 +38,7 @@ public class RocketmqConfigConstants { public static final String STORAGE_ROCKETMQ_TOPIC_DEFAULT = "RMQ_SYS_schema_registry_storage"; public static final String STORAGE_LOCAL_CACHE_PATH = "storage.local.cache.path"; - public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "/tmp/schema-registry/cache"; + public static final String STORAGE_LOCAL_CACHE_PATH_DEFAULT = "schema-registry/cache"; public static final byte[] STORAGE_ROCKSDB_SCHEMA_DEFAULT_FAMILY = "default".getBytes(StandardCharsets.UTF_8); public static final byte[] STORAGE_ROCKSDB_SCHEMA_COLUMN_FAMILY = "schema".getBytes(StandardCharsets.UTF_8); diff --git a/storage-rocketmq/src/main/resources/rocketmq.properties b/storage-rocketmq/src/main/resources/rocketmq.properties index 5070713..2a6e6a3 100644 --- a/storage-rocketmq/src/main/resources/rocketmq.properties +++ b/storage-rocketmq/src/main/resources/rocketmq.properties @@ -15,5 +15,5 @@ # limitations under the License. # -storage.type=rocketmq +#storage.type=rocketmq #storage.local.cache.path \ No newline at end of file diff --git a/war/pom.xml b/war/pom.xml index cb7b4cb..4109000 100644 --- a/war/pom.xml +++ b/war/pom.xml @@ -22,7 +22,7 @@ <parent> <artifactId>rocketmq-schema-registry-all</artifactId> <groupId>org.apache.rocketmq</groupId> - <version>0.0.2-SNAPSHOT</version> + <version>0.0.3-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
