This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 5384504 client
new 528c6b4 Merge pull request #18 from ferrirW/client-apadpt
5384504 is described below
commit 53845042074ac8e6dbe83e373a3a3b75c8a73fbe
Author: fan <[email protected]>
AuthorDate: Sun Jul 31 17:44:01 2022 +0800
client
---
.../client/NormalSchemaRegistryClient.java | 41 +++++----
.../registry/client/SchemaRegistryClient.java | 33 ++++----
.../client/SchemaRegistryClientFactory.java | 30 +++++++
.../client/exceptions/SerializationException.java | 34 ++++++++
.../schema/registry/client/rest/RestService.java | 77 +++++++++--------
.../serializer/AbstractAvroDeserializer.java | 81 ++++++++++++++++++
.../client/serializer/AbstractAvroSerializer.java | 97 ++++++++++++++++++++++
.../client/serializer/AvroDeserializer.java | 43 ++++++++++
.../registry/client/serializer/AvroSerializer.java | 48 +++++++++++
.../registry/client/serializer/Deserializer.java | 29 +++++++
.../registry/client/serializer/Serializer.java | 30 +++++++
11 files changed, 477 insertions(+), 66 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index 263a55c..b70295e 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -21,8 +21,13 @@ import java.io.IOException;
import java.util.List;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.rest.RestService;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
public class NormalSchemaRegistryClient implements SchemaRegistryClient {
@@ -33,54 +38,54 @@ public class NormalSchemaRegistryClient implements
SchemaRegistryClient {
}
@Override
- public SchemaDto registerSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws RestClientException, IOException {
- return restService.registerSchema(subject, schemaName, schemaDto);
+ public RegisterSchemaResponse registerSchema(String subject, String
schemaName,
+ RegisterSchemaRequest request) throws RestClientException, IOException
{
+ return restService.registerSchema(subject, schemaName, request);
}
@Override
- public SchemaDto registerSchema(String clusterName, String tenant, String
subjectName,
- String schemaName, SchemaDto schemaDto) throws IOException,
RestClientException {
- return restService.registerSchema(clusterName, tenant, subjectName,
schemaName, schemaDto);
+ public RegisterSchemaResponse registerSchema(String clusterName, String
tenant, String subjectName,
+ String schemaName, RegisterSchemaRequest request) throws IOException,
RestClientException {
+ return restService.registerSchema(clusterName, tenant, subjectName,
schemaName, request);
}
@Override
- public SchemaDto deleteSchema(String cluster, String tenant,
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
String subject) throws IOException, RestClientException {
return restService.deleteSchema(cluster, tenant, subject);
}
@Override
- public SchemaDto deleteSchema(String cluster, String tenant, String
subject,
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
String subject,
long version) throws IOException, RestClientException {
return restService.deleteSchema(cluster, tenant, subject, version);
}
@Override
- public SchemaDto updateSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws RestClientException, IOException {
- return restService.updateSchema(subject, schemaName, schemaDto);
+ public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+ UpdateSchemaRequest request) throws RestClientException, IOException {
+ return restService.updateSchema(subject, schemaName, request);
}
@Override
- public SchemaDto updateSchema(String cluster, String tenant, String
subjectName,
- String schemaName, SchemaDto schemaDto) throws IOException,
RestClientException {
- return restService.updateSchema(cluster, tenant, subjectName,
schemaName, schemaDto);
+ public UpdateSchemaResponse updateSchema(String cluster, String tenant,
String subjectName,
+ String schemaName, UpdateSchemaRequest request) throws IOException,
RestClientException {
+ return restService.updateSchema(cluster, tenant, subjectName,
schemaName, request);
}
@Override
- public SchemaRecordDto getSchemaBySubject(String subject) throws
RestClientException, IOException {
+ public GetSchemaResponse getSchemaBySubject(String subject) throws
RestClientException, IOException {
return restService.getSchemaBySubject(subject);
}
@Override
- public SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+ public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
String subject) throws RestClientException, IOException {
return restService.getSchemaBySubject(cluster, tenant, subject);
}
@Override
- public SchemaRecordDto getSchemaBySubjectAndVersion(String cluster, String
tenant, String subject,
+ public GetSchemaResponse getSchemaBySubjectAndVersion(String cluster,
String tenant, String subject,
long version) throws IOException, RestClientException {
return restService.getSchemaBySubject(cluster, tenant, subject,
version);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index bf7c3d4..23b60b1 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -20,34 +20,39 @@ package org.apache.rocketmq.schema.registry.client;
import java.io.IOException;
import java.util.List;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
public interface SchemaRegistryClient {
- SchemaDto registerSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws RestClientException, IOException;
+ RegisterSchemaResponse registerSchema(String subject, String schemaName,
+ RegisterSchemaRequest request) throws RestClientException, IOException;
- SchemaDto registerSchema(String clusterName, String tenant, String
subjectName, String schemaName,
- SchemaDto schemaDto) throws IOException, RestClientException;
+ RegisterSchemaResponse registerSchema(String clusterName, String tenant,
String subjectName, String schemaName,
+ RegisterSchemaRequest request) throws IOException, RestClientException;
- SchemaDto deleteSchema(String cluster, String tenant, String subject)
throws IOException, RestClientException;
+ DeleteSchemeResponse deleteSchema(String cluster, String tenant, String
subject) throws IOException, RestClientException;
- SchemaDto deleteSchema(String cluster, String tenant, String subject,
+ DeleteSchemeResponse deleteSchema(String cluster, String tenant, String
subject,
long version) throws IOException, RestClientException;
- SchemaDto updateSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws RestClientException, IOException;
+ UpdateSchemaResponse updateSchema(String subject, String schemaName,
+ UpdateSchemaRequest request) throws RestClientException, IOException;
- SchemaDto updateSchema(String cluster, String tenant, String subjectName,
String schemaName,
- SchemaDto schemaDto) throws IOException, RestClientException;
+ UpdateSchemaResponse updateSchema(String cluster, String tenant, String
subjectName, String schemaName,
+ UpdateSchemaRequest request) throws IOException, RestClientException;
- SchemaRecordDto getSchemaBySubject(String subject) throws
RestClientException, IOException;
+ GetSchemaResponse getSchemaBySubject(String subject) throws
RestClientException, IOException;
- SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+ GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
String subject) throws IOException, RestClientException;
- SchemaRecordDto getSchemaBySubjectAndVersion(String cluster, String
tenant, String subject,
+ GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String
tenant, String subject,
long version) throws IOException, RestClientException;
List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
new file mode 100644
index 0000000..fbe6b7a
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClientFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+
+import java.util.Map;
+
+public class SchemaRegistryClientFactory {
+
+ public static SchemaRegistryClient newClient(String baseUrl, Map<String,
String> map) {
+ RestService restService = null == map ? new RestService(baseUrl) : new
RestService(baseUrl, map);
+ return new NormalSchemaRegistryClient(restService);
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
new file mode 100644
index 0000000..d21a2ae
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/SerializationException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.exceptions;
+
+public class SerializationException extends RuntimeException {
+ public SerializationException(){}
+
+ public SerializationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public SerializationException(String msg) {
+ super(msg);
+ }
+
+ public SerializationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index df49076..f0138f5 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -25,19 +25,28 @@ import java.util.List;
import java.util.Map;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.util.HttpUtil;
-import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.UpdateSchemaResponse;
public class RestService {
- private static final TypeReference<SchemaDto> SCHEMA_DTO_TYPE_REFERENCE =
- new TypeReference<SchemaDto>() {
- };
- private static final TypeReference<SchemaRecordDto>
SCHEMA_RECORD_DTO_TYPE_REFERENCE =
- new TypeReference<SchemaRecordDto>() {
- };
+ private static final TypeReference<RegisterSchemaResponse>
REGISTER_SCHEMA_DTO_TYPE_REFERENCE =
+ new TypeReference<RegisterSchemaResponse>() { };
+
+ private static final TypeReference<UpdateSchemaResponse>
UPDATE_SCHEMA_DTO_TYPE_REFERENCE =
+ new TypeReference<UpdateSchemaResponse>() { };
+
+ private static final TypeReference<DeleteSchemeResponse>
DELETE_SCHEMA_DTO_TYPE_REFERENCE =
+ new TypeReference<DeleteSchemeResponse>() { };
+
+ private static final TypeReference<GetSchemaResponse>
GET_SCHEMA_DTO_TYPE_REFERENCE =
+ new TypeReference<GetSchemaResponse>() { };
private static final TypeReference<List<SchemaRecordDto>>
SCHEMA_RECORD_DTO_TYPE_LIST_REFERENCE =
- new TypeReference<List<SchemaRecordDto>>() {
- };
+ new TypeReference<List<SchemaRecordDto>>() { };
public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
@@ -60,70 +69,70 @@ public class RestService {
this.httpHeaders = httpHeaders;
}
- public SchemaDto registerSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws IOException, RestClientException {
+ public RegisterSchemaResponse registerSchema(String subject, String
schemaName,
+ RegisterSchemaRequest request) throws IOException, RestClientException
{
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/schema/{schema-name}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject, schemaName).toString());
- String data = jsonParser.writeValueAsString(schemaDto);
- return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ String data = jsonParser.writeValueAsString(request);
+ return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders,
REGISTER_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaDto registerSchema(String clusterName, String tenant, String
subjectName,
- String schemaName, SchemaDto schemaDto) throws IOException,
RestClientException {
+ public RegisterSchemaResponse registerSchema(String clusterName, String
tenant, String subjectName,
+ String schemaName, RegisterSchemaRequest request) throws IOException,
RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(clusterName, tenant, subjectName, schemaName).toString());
- String data = jsonParser.writeValueAsString(schemaDto);
- return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ String data = jsonParser.writeValueAsString(request);
+ return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders,
REGISTER_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaDto deleteSchema(String cluster, String tenant,
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
String subject) throws RestClientException, IOException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders,
DELETE_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaDto deleteSchema(String cluster, String tenant, String
subject,
+ public DeleteSchemeResponse deleteSchema(String cluster, String tenant,
String subject,
long version) throws IOException, RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject, version).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders,
DELETE_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaDto updateSchema(String subject, String schemaName,
- SchemaDto schemaDto) throws IOException, RestClientException {
+ public UpdateSchemaResponse updateSchema(String subject, String schemaName,
+ UpdateSchemaRequest request) throws IOException, RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/schema/{schema-name}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject, schemaName).toString());
- String data = jsonParser.writeValueAsString(schemaDto);
- return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ String data = jsonParser.writeValueAsString(request);
+ return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders,
UPDATE_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaDto updateSchema(String cluster, String tenant, String
subject,
- String schemaName, SchemaDto schemaDto) throws IOException,
RestClientException {
+ public UpdateSchemaResponse updateSchema(String cluster, String tenant,
String subject,
+ String schemaName, UpdateSchemaRequest schemaDto) throws IOException,
RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject, schemaName).toString());
String data = jsonParser.writeValueAsString(schemaDto);
- return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders,
SCHEMA_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders,
UPDATE_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaRecordDto getSchemaBySubject(String subject) throws
RestClientException, IOException {
+ public GetSchemaResponse getSchemaBySubject(String subject) throws
RestClientException, IOException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/schema");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
+ public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
String subject) throws IOException, RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
}
- public SchemaRecordDto getSchemaBySubject(String cluster, String tenant,
String subject,
+ public GetSchemaResponse getSchemaBySubject(String cluster, String tenant,
String subject,
long version) throws IOException, RestClientException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions/{version}");
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject, version).toString());
- return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
}
public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String
tenant,
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
new file mode 100644
index 0000000..c727706
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractAvroDeserializer<T> {
+
+ Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
+
+ private final DecoderFactory decoderFactory = DecoderFactory.get();
+ protected SchemaRegistryClient schemaRegistry;
+
+ protected T deserializeImpl(String subject, byte[] payload)
+ throws SerializationException {
+ if (schemaRegistry == null) {
+ throw new SerializationException("please initialize the schema
registry client first");
+ }
+ if (payload == null) {
+ return null;
+ }
+
+ try {
+ GetSchemaResponse response =
schemaRegistry.getSchemaBySubject(subject);
+ Schema schema = new Schema.Parser().parse(response.getIdl());
+ return avroDecode(payload, schema);
+ } catch (RestClientException | IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public T avroDecode(byte[] input, Schema schema) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(input);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ try {
+ decoder.readBytes(buffer);
+ } catch (Exception e) {
+ log.error("read bytes error: ", e);
+ }
+
+ long id = buffer.getLong();
+ long version = buffer.getLong();
+
+ DatumReader<T> datumReader = new SpecificDatumReader<T>(schema);
+ T originMessage = datumReader.read(null, decoder);
+ return originMessage;
+ }
+
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
new file mode 100644
index 0000000..09fe5f9
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+
+public class AbstractAvroSerializer<T> {
+
+ private static final int SCHEMA_ID_LENGTH = 64;
+ private static final int SCHEMA_VERSION_LENGTH = 64;
+ protected SchemaRegistryClient schemaRegistry;
+ private final EncoderFactory encoderFactory = EncoderFactory.get();
+
+ protected byte[] serializeImpl(
+ String subject, T originMessage)
+ throws SerializationException {
+ if (schemaRegistry == null) {
+ throw new SerializationException("please initialize the schema
registry client first");
+ }
+
+ if (originMessage == null) {
+ return null;
+ }
+
+ try {
+
+ } catch (Exception e) {
+ throw new SerializationException("get schema by subject failed",
e);
+ }
+
+ try {
+ GetSchemaResponse response = getSchemaBySubject(subject);
+ long schemaId = response.getSchemaId();
+ long schemaVersion = response.getVersion();
+ String schemaIdl = response.getIdl();
+ Schema schema = new Schema.Parser().parse(schemaIdl);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
out.write(ByteBuffer.allocate(SCHEMA_ID_LENGTH).putLong(schemaId).array());
+
out.write(ByteBuffer.allocate(SCHEMA_VERSION_LENGTH).putLong(schemaVersion).array());
+ writeDatum(out, originMessage, schema);
+
+ byte[] bytes = out.toByteArray();
+ out.close();
+ return bytes;
+ } catch (ExecutionException e) {
+ throw new SerializationException("serialize Avro message failed",
e.getCause());
+ } catch (IOException | RuntimeException e) {
+ throw new SerializationException("serialize Avro message failed",
e);
+ } catch (RestClientException e) {
+ throw new SerializationException("get schema by subject failed",
e);
+ }
+ }
+
+ private void writeDatum(ByteArrayOutputStream out, Object originMessage,
Schema schema)
+ throws ExecutionException, IOException {
+ BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
+
+ DatumWriter<Object> datumWriter = new SpecificDatumWriter<>(schema);
+ datumWriter.write(originMessage, encoder);
+ encoder.flush();
+ datumWriter.write(originMessage, encoder);
+ encoder.flush();
+ }
+
+ private GetSchemaResponse getSchemaBySubject(String subject) throws
RestClientException, IOException {
+ return schemaRegistry.getSchemaBySubject(subject);
+ }
+
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
new file mode 100644
index 0000000..406eec7
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+
+import java.util.Map;
+
+public class AvroDeserializer<T> extends AbstractAvroDeserializer implements
Deserializer<T> {
+ public AvroDeserializer(){}
+
+ public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
+ schemaRegistry = schemaRegistryClient;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ }
+
+ @Override
+ public T deserialize(String subject, byte[] bytes) {
+ return (T) deserializeImpl(subject, bytes);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
new file mode 100644
index 0000000..9276e8b
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+
+import java.util.Map;
+
+public class AvroSerializer<T> extends AbstractAvroSerializer implements
Serializer<T> {
+
+ public AvroSerializer() {}
+
+ public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
+ schemaRegistry = schemaRegistryClient;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ Serializer.super.configure(configs);
+ }
+
+ @Override
+ public byte[] serialize(String subject, T originMessage) {
+ if (originMessage == null) {
+ return null;
+ }
+ return serializeImpl(subject, originMessage);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
new file mode 100644
index 0000000..81ee7a5
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public interface Deserializer<T> extends Closeable {
+ default void configure(Map<String, ?> configs) {}
+
+ T deserialize(String subject, byte[] bytes);
+
+ default void close(){};
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
new file mode 100644
index 0000000..0b01c75
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serializer;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public interface Serializer<T> extends Closeable {
+
+ default void configure(Map<String, ?> configs) {}
+
+ byte[] serialize(String subject, T originMessage);
+
+ default void close(){}
+}