This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new 5384504  client
     new 528c6b4  Merge pull request #18 from ferrirW/client-apadpt
5384504 is described below

commit 53845042074ac8e6dbe83e373a3a3b75c8a73fbe
Author: fan <[email protected]>
AuthorDate: Sun Jul 31 17:44:01 2022 +0800

    client
---
 .../client/NormalSchemaRegistryClient.java         | 41 +++++----
 .../registry/client/SchemaRegistryClient.java      | 33 ++++----
 .../client/SchemaRegistryClientFactory.java        | 30 +++++++
 .../client/exceptions/SerializationException.java  | 34 ++++++++
 .../schema/registry/client/rest/RestService.java   | 77 +++++++++--------
 .../serializer/AbstractAvroDeserializer.java       | 81 ++++++++++++++++++
 .../client/serializer/AbstractAvroSerializer.java  | 97 ++++++++++++++++++++++
 .../client/serializer/AvroDeserializer.java        | 43 ++++++++++
 .../registry/client/serializer/AvroSerializer.java | 48 +++++++++++
 .../registry/client/serializer/Deserializer.java   | 29 +++++++
 .../registry/client/serializer/Serializer.java     | 30 +++++++
 11 files changed, 477 insertions(+), 66 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index 263a55c..b70295e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -21,8 +21,13 @@ import java.io.IOException;
 import java.util.List;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.rest.RestService;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
 
 public class NormalSchemaRegistryClient implements SchemaRegistryClient {
 
@@ -33,54 +38,54 @@ public class NormalSchemaRegistryClient implements 
SchemaRegistryClient {
     }
 
     @Override
-    public SchemaDto registerSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws RestClientException, IOException {
-        return restService.registerSchema(subject, schemaName, schemaDto);
+    public RegisterSchemaResponse registerSchema(String subject, String 
schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException 
{
+        return restService.registerSchema(subject, schemaName, request);
     }
 
     @Override
-    public SchemaDto registerSchema(String clusterName, String tenant, String 
subjectName,
-        String schemaName, SchemaDto schemaDto) throws IOException, 
RestClientException {
-        return restService.registerSchema(clusterName, tenant, subjectName, 
schemaName, schemaDto);
+    public RegisterSchemaResponse registerSchema(String clusterName, String 
tenant, String subjectName,
+        String schemaName, RegisterSchemaRequest request) throws IOException, 
RestClientException {
+        return restService.registerSchema(clusterName, tenant, subjectName, 
schemaName, request);
     }
 
     @Override
-    public SchemaDto deleteSchema(String cluster, String tenant,
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
         String subject) throws IOException, RestClientException {
         return restService.deleteSchema(cluster, tenant, subject);
     }
 
     @Override
-    public SchemaDto deleteSchema(String cluster, String tenant, String 
subject,
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant, 
String subject,
         long version) throws IOException, RestClientException {
         return restService.deleteSchema(cluster, tenant, subject, version);
     }
 
     @Override
-    public SchemaDto updateSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws RestClientException, IOException {
-        return restService.updateSchema(subject, schemaName, schemaDto);
+    public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws RestClientException, IOException {
+        return restService.updateSchema(subject, schemaName, request);
     }
 
     @Override
-    public SchemaDto updateSchema(String cluster, String tenant, String 
subjectName,
-        String schemaName, SchemaDto schemaDto) throws IOException, 
RestClientException {
-        return restService.updateSchema(cluster, tenant, subjectName, 
schemaName, schemaDto);
+    public UpdateSchemaResponse updateSchema(String cluster, String tenant, 
String subjectName,
+        String schemaName, UpdateSchemaRequest request) throws IOException, 
RestClientException {
+        return restService.updateSchema(cluster, tenant, subjectName, 
schemaName, request);
     }
 
     @Override
-    public SchemaRecordDto getSchemaBySubject(String subject) throws 
RestClientException, IOException {
+    public GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException {
         return restService.getSchemaBySubject(subject);
     }
 
     @Override
-    public SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+    public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
         String subject) throws RestClientException, IOException {
         return restService.getSchemaBySubject(cluster, tenant, subject);
     }
 
     @Override
-    public SchemaRecordDto getSchemaBySubjectAndVersion(String cluster, String 
tenant, String subject,
+    public GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, 
String tenant, String subject,
         long version) throws IOException, RestClientException {
         return restService.getSchemaBySubject(cluster, tenant, subject, 
version);
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index bf7c3d4..23b60b1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -20,34 +20,39 @@ package org.apache.rocketmq.schema.registry.client;
 import java.io.IOException;
 import java.util.List;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
 
 public interface SchemaRegistryClient {
 
-    SchemaDto registerSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws RestClientException, IOException;
+    RegisterSchemaResponse registerSchema(String subject, String schemaName,
+        RegisterSchemaRequest request) throws RestClientException, IOException;
 
-    SchemaDto registerSchema(String clusterName, String tenant, String 
subjectName, String schemaName,
-        SchemaDto schemaDto) throws IOException, RestClientException;
+    RegisterSchemaResponse registerSchema(String clusterName, String tenant, 
String subjectName, String schemaName,
+        RegisterSchemaRequest request) throws IOException, RestClientException;
 
-    SchemaDto deleteSchema(String cluster, String tenant, String subject) 
throws IOException, RestClientException;
+    DeleteSchemeResponse deleteSchema(String cluster, String tenant, String 
subject) throws IOException, RestClientException;
 
-    SchemaDto deleteSchema(String cluster, String tenant, String subject,
+    DeleteSchemeResponse deleteSchema(String cluster, String tenant, String 
subject,
         long version) throws IOException, RestClientException;
 
-    SchemaDto updateSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws RestClientException, IOException;
+    UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws RestClientException, IOException;
 
-    SchemaDto updateSchema(String cluster, String tenant, String subjectName, 
String schemaName,
-        SchemaDto schemaDto) throws IOException, RestClientException;
+    UpdateSchemaResponse updateSchema(String cluster, String tenant, String 
subjectName, String schemaName,
+        UpdateSchemaRequest request) throws IOException, RestClientException;
 
-    SchemaRecordDto getSchemaBySubject(String subject) throws 
RestClientException, IOException;
+    GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException;
 
-    SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+    GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
         String subject) throws IOException, RestClientException;
 
-    SchemaRecordDto getSchemaBySubjectAndVersion(String cluster, String 
tenant, String subject,
+    GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String 
tenant, String subject,
         long version) throws IOException, RestClientException;
 
     List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
new file mode 100644
index 0000000..fbe6b7a
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.util.Map;
+
+public class SchemaRegistryClientFactory {
+
+    public static SchemaRegistryClient newClient(String baseUrl, Map<String, 
String> map) {
+        RestService restService = null == map ? new RestService(baseUrl) : new 
RestService(baseUrl, map);
+        return new NormalSchemaRegistryClient(restService);
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
new file mode 100644
index 0000000..d21a2ae
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.exceptions;
+
+public class SerializationException extends RuntimeException {
+    public SerializationException(){}
+
+    public SerializationException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    public SerializationException(String msg) {
+        super(msg);
+    }
+
+    public SerializationException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index df49076..f0138f5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -25,19 +25,28 @@ import java.util.List;
 import java.util.Map;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.util.HttpUtil;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
 
 public class RestService {
-    private static final TypeReference<SchemaDto> SCHEMA_DTO_TYPE_REFERENCE =
-        new TypeReference<SchemaDto>() {
-        };
-    private static final TypeReference<SchemaRecordDto> 
SCHEMA_RECORD_DTO_TYPE_REFERENCE =
-        new TypeReference<SchemaRecordDto>() {
-        };
+    private static final TypeReference<RegisterSchemaResponse> 
REGISTER_SCHEMA_DTO_TYPE_REFERENCE =
+        new TypeReference<RegisterSchemaResponse>() { };
+
+    private static final TypeReference<UpdateSchemaResponse> 
UPDATE_SCHEMA_DTO_TYPE_REFERENCE =
+        new TypeReference<UpdateSchemaResponse>() { };
+
+    private static final TypeReference<DeleteSchemeResponse> 
DELETE_SCHEMA_DTO_TYPE_REFERENCE =
+        new TypeReference<DeleteSchemeResponse>() { };
+
+    private static final TypeReference<GetSchemaResponse> 
GET_SCHEMA_DTO_TYPE_REFERENCE =
+        new TypeReference<GetSchemaResponse>() { };
     private static final TypeReference<List<SchemaRecordDto>> 
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE =
-        new TypeReference<List<SchemaRecordDto>>() {
-        };
+        new TypeReference<List<SchemaRecordDto>>() { };
 
     public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
 
@@ -60,70 +69,70 @@ public class RestService {
         this.httpHeaders = httpHeaders;
     }
 
-    public SchemaDto registerSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws IOException, RestClientException {
+    public RegisterSchemaResponse registerSchema(String subject, String 
schemaName,
+        RegisterSchemaRequest request) throws IOException, RestClientException 
{
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/schema/{schema-name}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject, schemaName).toString());
-        String data = jsonParser.writeValueAsString(schemaDto);
-        return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        String data = jsonParser.writeValueAsString(request);
+        return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, 
REGISTER_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaDto registerSchema(String clusterName, String tenant, String 
subjectName,
-        String schemaName, SchemaDto schemaDto) throws IOException, 
RestClientException {
+    public RegisterSchemaResponse registerSchema(String clusterName, String 
tenant, String subjectName,
+        String schemaName, RegisterSchemaRequest request) throws IOException, 
RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(clusterName, tenant, subjectName, schemaName).toString());
-        String data = jsonParser.writeValueAsString(schemaDto);
-        return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        String data = jsonParser.writeValueAsString(request);
+        return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, 
REGISTER_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaDto deleteSchema(String cluster, String tenant,
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
         String subject) throws RestClientException, IOException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject).toString());
-        return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, 
DELETE_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaDto deleteSchema(String cluster, String tenant, String 
subject,
+    public DeleteSchemeResponse deleteSchema(String cluster, String tenant, 
String subject,
         long version) throws IOException, RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject, version).toString());
-        return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, 
DELETE_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaDto updateSchema(String subject, String schemaName,
-        SchemaDto schemaDto) throws IOException, RestClientException {
+    public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+        UpdateSchemaRequest request) throws IOException, RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/schema/{schema-name}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject, schemaName).toString());
-        String data = jsonParser.writeValueAsString(schemaDto);
-        return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        String data = jsonParser.writeValueAsString(request);
+        return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, 
UPDATE_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaDto updateSchema(String cluster, String tenant, String 
subject,
-        String schemaName, SchemaDto schemaDto) throws IOException, 
RestClientException {
+    public UpdateSchemaResponse updateSchema(String cluster, String tenant, 
String subject,
+        String schemaName, UpdateSchemaRequest schemaDto) throws IOException, 
RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject, schemaName).toString());
         String data = jsonParser.writeValueAsString(schemaDto);
-        return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, 
SCHEMA_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, 
UPDATE_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaRecordDto getSchemaBySubject(String subject) throws 
RestClientException, IOException {
+    public GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/schema");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject).toString());
-        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+    public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
         String subject) throws IOException, RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject).toString());
-        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
-    public SchemaRecordDto getSchemaBySubject(String cluster, String tenant, 
String subject,
+    public GetSchemaResponse getSchemaBySubject(String cluster, String tenant, 
String subject,
         long version) throws IOException, RestClientException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}");
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject, version).toString());
-        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
     public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String 
tenant,
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
new file mode 100644
index 0000000..c727706
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractAvroDeserializer<T> {
+
+    Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
+
+    private final DecoderFactory decoderFactory = DecoderFactory.get();
+    protected SchemaRegistryClient schemaRegistry;
+
+    protected T deserializeImpl(String subject, byte[] payload)
+            throws SerializationException {
+        if (schemaRegistry == null) {
+            throw new SerializationException("please initialize the schema 
registry client first");
+        }
+        if (payload == null) {
+            return null;
+        }
+
+        try {
+            GetSchemaResponse response = 
schemaRegistry.getSchemaBySubject(subject);
+            Schema schema = new Schema.Parser().parse(response.getIdl());
+            return avroDecode(payload, schema);
+        } catch (RestClientException | IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public T avroDecode(byte[] input, Schema schema) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(input);
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
+
+        ByteBuffer buffer = ByteBuffer.allocate(16);
+        try {
+            decoder.readBytes(buffer);
+        } catch (Exception e) {
+            log.error("read bytes error: ", e);
+        }
+
+        long id = buffer.getLong();
+        long version = buffer.getLong();
+
+        DatumReader<T> datumReader = new SpecificDatumReader<T>(schema);
+        T originMessage = datumReader.read(null, decoder);
+        return originMessage;
+    }
+
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
new file mode 100644
index 0000000..09fe5f9
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+
+public class AbstractAvroSerializer<T> {
+
+    private static final int SCHEMA_ID_LENGTH = 64;
+    private static final int SCHEMA_VERSION_LENGTH = 64;
+    protected SchemaRegistryClient schemaRegistry;
+    private final EncoderFactory encoderFactory = EncoderFactory.get();
+
+    protected byte[] serializeImpl(
+            String subject, T originMessage)
+            throws SerializationException {
+        if (schemaRegistry == null) {
+            throw new SerializationException("please initialize the schema 
registry client first");
+        }
+
+        if (originMessage == null) {
+            return null;
+        }
+
+        try {
+
+        } catch (Exception e) {
+            throw new SerializationException("get schema by subject failed", 
e);
+        }
+
+        try {
+            GetSchemaResponse response = getSchemaBySubject(subject);
+            long schemaId = response.getSchemaId();
+            long schemaVersion = response.getVersion();
+            String schemaIdl = response.getIdl();
+            Schema schema = new Schema.Parser().parse(schemaIdl);
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            
out.write(ByteBuffer.allocate(SCHEMA_ID_LENGTH).putLong(schemaId).array());
+            
out.write(ByteBuffer.allocate(SCHEMA_VERSION_LENGTH).putLong(schemaVersion).array());
+            writeDatum(out, originMessage, schema);
+
+            byte[] bytes = out.toByteArray();
+            out.close();
+            return bytes;
+        } catch (ExecutionException e) {
+            throw new SerializationException("serialize Avro message failed", 
e.getCause());
+        } catch (IOException | RuntimeException e) {
+            throw new SerializationException("serialize Avro message failed", 
e);
+        } catch (RestClientException e) {
+            throw new SerializationException("get schema by subject failed", 
e);
+        }
+    }
+
+    private void writeDatum(ByteArrayOutputStream out, Object originMessage, 
Schema schema)
+            throws ExecutionException, IOException {
+        BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
+
+        DatumWriter<Object> datumWriter = new SpecificDatumWriter<>(schema);
+        datumWriter.write(originMessage, encoder);
+        encoder.flush();
+        datumWriter.write(originMessage, encoder);
+        encoder.flush();
+    }
+
+    private GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException {
+        return schemaRegistry.getSchemaBySubject(subject);
+    }
+
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
new file mode 100644
index 0000000..406eec7
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+
+import java.util.Map;
+
+public class AvroDeserializer<T> extends AbstractAvroDeserializer implements 
Deserializer<T> {
+    public AvroDeserializer(){}
+
+    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
+        schemaRegistry = schemaRegistryClient;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    @Override
+    public T deserialize(String subject, byte[] bytes) {
+        return (T) deserializeImpl(subject, bytes);
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
new file mode 100644
index 0000000..9276e8b
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+
+import java.util.Map;
+
+public class AvroSerializer<T> extends AbstractAvroSerializer implements 
Serializer<T> {
+
+    public AvroSerializer() {}
+
+    public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
+        schemaRegistry = schemaRegistryClient;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        Serializer.super.configure(configs);
+    }
+
+    @Override
+    public byte[] serialize(String subject, T originMessage) {
+        if (originMessage == null) {
+            return null;
+        }
+        return serializeImpl(subject, originMessage);
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
new file mode 100644
index 0000000..81ee7a5
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public interface Deserializer<T> extends Closeable {
+    default void configure(Map<String, ?> configs) {}
+
+    T deserialize(String subject, byte[] bytes);
+
+    default void close(){};
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
new file mode 100644
index 0000000..0b01c75
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public interface Serializer<T> extends Closeable {
+
+    default void configure(Map<String, ?> configs) {}
+
+    byte[] serialize(String subject, T originMessage);
+
+    default void close(){}
+}

Reply via email to