This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new 7043568  check avro schema compatibility by schema recordID during 
ser/de (#63)
7043568 is described below

commit 70435683fac84fac0335efa646b3e94c623951fa
Author: Humkum <[email protected]>
AuthorDate: Thu Oct 6 09:08:51 2022 +0800

    check avro schema compatibility by schema recordID during ser/de (#63)
---
 .../client/NormalSchemaRegistryClient.java         |  27 ++++
 .../registry/client/SchemaRegistryClient.java      |  13 ++
 ...oSerializerConfig.java => AvroSerdeConfig.java} |   9 +-
 ...nSerializerConfig.java => JsonSerdeConfig.java} |   4 +-
 .../{SerializerConfig.java => SerdeConfig.java}    |  24 ++-
 .../schema/registry/client/rest/RestService.java   |  32 ++++
 .../client/serde/avro/AvroDeserializer.java        |  90 +++++++----
 .../registry/client/serde/avro/AvroSerializer.java |  23 ++-
 .../client/serde/avro/GenericAvroDeserializer.java |  11 +-
 .../client/serde/avro/GenericAvroSerde.java        |  21 ++-
 .../client/serde/avro/GenericAvroSerializer.java   |  10 +-
 .../serde/avro/ReflectionAvroDeserializer.java     |   6 +-
 .../serde/avro/SpecificAvroDeserializer.java       |   6 +
 .../client/serde/json/JsonDeserializer.java        |   4 +-
 .../registry/client/serde/json/JsonSerializer.java |   6 +-
 .../client/serde/SkipSchemaRegistrySerdeTest.java  |   6 +-
 .../client/serde/avro/GenericAvroSerdeTest.java    |   7 +-
 .../client/serde/avro/ReflectionAvroSerdeTest.java |   4 +-
 .../client/serde/avro/SpecificAvroSerdeTest.java   |   6 +-
 .../registry/client/serde/json/JsonSerdeTest.java  |   6 +-
 .../registry/common/constant/SchemaConstants.java  |   1 +
 ...SchemaDefination.java => SchemaDefinition.java} |   2 +-
 .../registry/common/storage/StorageService.java    |   4 +
 .../common/storage/StorageServiceProxy.java        |   7 +
 .../registry/core/api/v1/SchemaController.java     | 176 +++++++++++++++++++++
 .../registry/core/service/SchemaService.java       |   4 +
 .../registry/core/service/SchemaServiceImpl.java   |  20 +++
 .../schema/registry/example/GetSchemaDemo.java     |   6 +
 .../example/serde/avro/GenericAvroSerdeDemo.java   |  12 +-
 .../serde/avro/ReflectionAvroSerdeDemo.java        |   4 +-
 .../example/serde/avro/SpecificAvroSerdeDemo.java  |   5 +-
 .../registry/example/serde/json/JsonSerdeDemo.java |   4 +-
 .../serde/json/JsonSerdeWithoutServerDemo.java     |   6 +-
 .../storage/rocketmq/RocketmqStorageClient.java    |   4 +
 .../rocketmq/RocketmqStorageClientImpl.java        |  30 ++++
 .../storage/rocketmq/RocketmqStorageService.java   |   5 +
 36 files changed, 503 insertions(+), 102 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index 1256bb5..966441e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -90,6 +90,23 @@ public class NormalSchemaRegistryClient implements 
SchemaRegistryClient {
         return restService.getSchemaBySubject(cluster, tenant, subject, 
version);
     }
 
+    public GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long 
version)
+        throws IOException, RestClientException {
+        return restService.getSchemaBySubject(subject, version);
+    }
+
+    @Override
+    public GetSchemaResponse getTargetSchema(String cluster, String tenant, 
String subject, String schema)
+        throws RestClientException, IOException {
+        return restService.getTargetSchema(cluster, tenant, subject, schema);
+    }
+
+    @Override
+    public GetSchemaResponse getTargetSchema(String subject, String schema)
+        throws RestClientException, IOException {
+        return restService.getTargetSchema(subject, schema);
+    }
+
     @Override
     public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String 
tenant,
         String subject) throws RestClientException, IOException {
@@ -107,4 +124,14 @@ public class NormalSchemaRegistryClient implements 
SchemaRegistryClient {
         return restService.getAllTenants(cluster);
     }
 
+    public GetSchemaResponse getSchemaByRecordId(String cluster, String 
tenant, String subject,
+        long recordId) throws RestClientException, IOException {
+        return restService.getSchemaByRecordId(cluster, tenant, subject, 
recordId);
+    }
+
+    @Override
+    public GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+            throws RestClientException, IOException {
+        return restService.getSchemaByRecordId(subject, recordId);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index 7e02738..32c4ff2 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -55,10 +55,23 @@ public interface SchemaRegistryClient {
     GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String 
tenant, String subject,
         long version) throws IOException, RestClientException;
 
+    GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long 
version)
+        throws IOException, RestClientException;
+
+    GetSchemaResponse getTargetSchema(String cluster, String tenant, String 
subject, String schema)
+        throws RestClientException, IOException;
+    GetSchemaResponse getTargetSchema(String subject, String schema) throws 
RestClientException, IOException;
+
     List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
         String subject) throws RestClientException, IOException;
 
     List<String> getSubjectsByTenant(String cluster, String tenant) throws 
RestClientException, IOException;
 
     List<String> getAllTenants(String cluster) throws RestClientException, 
IOException;
+
+    GetSchemaResponse getSchemaByRecordId(String cluster, String tenant, 
String subject, long recordId)
+        throws RestClientException, IOException;
+
+    GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+            throws RestClientException, IOException;
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
similarity index 84%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
index 15b32bf..78e2b9c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
@@ -18,12 +18,15 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class AvroSerializerConfig extends SerializerConfig {
+public class AvroSerdeConfig extends SerdeConfig {
+    /**
+     * use generic datum reader to deserialize genericRecord.
+     */
     public final static String USE_GENERIC_DATUM_READER =
-            "use.generic.datum.reader";
+        "use.generic.datum.reader";
     public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
 
-    public AvroSerializerConfig(Map<String, Object> configs) {
+    public AvroSerdeConfig(Map<String, Object> configs) {
         this.configs = configs;
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
similarity index 88%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
index 56af0c7..8545d9c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
@@ -18,8 +18,8 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class JsonSerializerConfig extends SerializerConfig {
-    public JsonSerializerConfig(Map<String, Object> configs) {
+public class JsonSerdeConfig extends SerdeConfig {
+    public JsonSerdeConfig(Map<String, Object> configs) {
         this.configs = configs;
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
similarity index 63%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
index 87b582b..9fc3c50 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
@@ -18,12 +18,26 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class SerializerConfig {
+public class SerdeConfig {
     public final static String SKIP_SCHEMA_REGISTRY =
             "skip.schema.registry";
     public final static boolean SKIP_SCHEMA_REGISTRY_DEFAULT = false;
     public final static String DESERIALIZE_TARGET_TYPE =
             "deserialize.target.type";
+    /**
+     * use target version schema to serialize/deserialize without recordId.
+     * version default null, will use the latest version.
+     */
+    public final static String USE_TARGET_VERSION_SCHEMA =
+            "use.target.version.schema";
+
+    public final static boolean USE_TARGET_VERSION_SCHEMA_DEFAULT = false;
+
+    public final static String SCHEMA_TARGET_VERSION =
+            "schema.target.version";
+
+    public final static long SCHEMA_TARGET_VERSION_DEFAULT = -1;
+
     protected Map<String, Object> configs;
 
     public boolean skipSchemaRegistry() {
@@ -33,4 +47,12 @@ public class SerializerConfig {
     public Class<?> deserializeTargetType() {
         return (Class) configs.get(DESERIALIZE_TARGET_TYPE);
     }
+
+    public boolean useTargetVersionSchema() {
+        return (boolean) configs.getOrDefault(USE_TARGET_VERSION_SCHEMA, 
USE_TARGET_VERSION_SCHEMA_DEFAULT);
+    }
+
+    public long schemaTargetVersion() {
+        return (long) configs.getOrDefault(SCHEMA_TARGET_VERSION, 
SCHEMA_TARGET_VERSION_DEFAULT);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index 704ac77..a1207eb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -140,6 +140,12 @@ public class RestService {
         return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
     }
 
+    public GetSchemaResponse getSchemaBySubject(String subject, long version)
+        throws IOException, RestClientException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/schema/versions/{version}");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject, version).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
+    }
     public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String 
tenant,
         String subject) throws RestClientException, IOException {
         UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions");
@@ -158,4 +164,30 @@ public class RestService {
         String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster).toString());
         return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
LIST_STRING_REFERENCE);
     }
+
+    public GetSchemaResponse getTargetSchema(String subject, String schema) 
throws RestClientException, IOException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/schema/schema");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_POST, schema, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public GetSchemaResponse getTargetSchema(String cluster, String tenant, 
String subject, String schema)
+        throws RestClientException, IOException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/schema");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_POST, schema, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public GetSchemaResponse getSchemaByRecordId(String cluster, String 
tenant, String subject, long recordId)
+        throws RestClientException, IOException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/recordId/{record-id}/schema");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(cluster, tenant, subject, recordId).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public GetSchemaResponse getSchemaByRecordId(String subject, long 
recordId) throws RestClientException, IOException {
+        UrlBuilder urlBuilder = 
UrlBuilder.fromPath("/subject/{subject-name}/recordId/{record-id}/schema");
+        String path = HttpUtil.buildRequestUrl(baseUri, 
urlBuilder.build(subject, recordId).toString());
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, 
GET_SCHEMA_DTO_TYPE_REFERENCE);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index 87622c0..2e4391e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -22,9 +22,11 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -40,11 +42,10 @@ import java.util.Map;
 public class AvroDeserializer<T> implements Deserializer<T> {
 
     Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
-
-    private final DecoderFactory decoderFactory = DecoderFactory.get();
     protected SchemaRegistryClient schemaRegistry;
-
     private boolean useGenericReader;
+    private boolean useTargetVersionSchema;
+    private long schemaTargetVersion;
 
     public AvroDeserializer(){}
 
@@ -54,12 +55,19 @@ public class AvroDeserializer<T> implements Deserializer<T> 
{
 
     @Override
     public void configure(Map<String, Object> configs) {
-        AvroSerializerConfig config = new AvroSerializerConfig(configs);
+        AvroSerdeConfig config = new AvroSerdeConfig(configs);
         this.useGenericReader = config.useGenericReader();
+        this.useTargetVersionSchema = config.useTargetVersionSchema();
+        //get schema by version, if didn't configure this, would use the 
latest version
+        this.schemaTargetVersion = config.schemaTargetVersion();
     }
 
     @Override
-    public T deserialize(String subject, byte[] payload)
+    public T deserialize(String subject, byte[] payload) {
+        return this.deserialize(subject, payload, null);
+    }
+
+    public T deserialize(String subject, byte[] payload, Schema readerSchema)
             throws SerializationException {
         if (schemaRegistry == null) {
             throw new SerializationException("please initialize the schema 
registry client first");
@@ -69,38 +77,60 @@ public class AvroDeserializer<T> implements Deserializer<T> 
{
         }
 
         try {
-            GetSchemaResponse response = 
schemaRegistry.getSchemaBySubject(subject);
-            Schema schema = new Schema.Parser().parse(response.getIdl());
-            return avroDecode(payload, schema);
-        } catch (RestClientException | IOException e) {
-            throw new RuntimeException(e);
+            ByteArrayInputStream bais = new ByteArrayInputStream(payload);
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, 
null);
+
+            GetSchemaResponse response;
+            if (useTargetVersionSchema) {
+                response = AvroSerdeConfig.SCHEMA_TARGET_VERSION_DEFAULT == 
schemaTargetVersion
+                    ? schemaRegistry.getSchemaBySubject(subject)
+                    : schemaRegistry.getSchemaBySubjectAndVersion(subject, 
schemaTargetVersion);
+            } else {
+                ByteBuffer buffer = ByteBuffer.allocate(16);
+                decoder.readBytes(buffer);
+                long schemaRecordId = buffer.getLong();
+                response = schemaRegistry.getSchemaByRecordId(subject, 
schemaRecordId);
+            }
+            Schema writerSchema = new Schema.Parser().parse(response.getIdl());
+            if (readerSchema == null) {
+                readerSchema = getReaderSchema(writerSchema);
+            }
+
+            DatumReader<T> datumReader = getDatumReader(writerSchema, 
readerSchema);
+            return datumReader.read(null, decoder);
+        } catch (RestClientException e) {
+            log.warn("get schema by record id failed, maybe the schema storage 
service not available now", e);
+            throw new SerializationException("get schema by record id failed, 
maybe the schema storage service not available now", e);
+        } catch (IOException e) {
+            log.warn("deserialize failed", e);
+            throw new SerializationException("deserialize error", e);
         }
-
     }
 
-    public T avroDecode(byte[] input, Schema schema) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(input);
-        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
-
-        ByteBuffer buffer = ByteBuffer.allocate(16);
-        try {
-            decoder.readBytes(buffer);
-        } catch (Exception e) {
-            log.error("read bytes error: ", e);
+    @SuppressWarnings("unchecked")
+    private Schema getReaderSchema(Schema writerSchema) {
+        if (useGenericReader) {
+            return writerSchema;
+        } else {
+            Class<SpecificRecord> readerClass = 
SpecificData.get().getClass(writerSchema);
+            if (readerClass == null) {
+                throw new SerializationException("cannot get a schema for a 
SpecificRecord");
+            }
+            try {
+                return readerClass.newInstance().getSchema();
+            } catch (InstantiationException e) {
+                throw new SerializationException("cannot initialize reader 
schema by writerSchema class", e);
+            } catch (IllegalAccessException e) {
+                throw new SerializationException("not allowed initialize 
reader schema by writerSchema class", e);
+            }
         }
-
-        long schemaRecordId = buffer.getLong();
-
-        DatumReader<T> datumReader = getDatumReader(schema);
-        T record = datumReader.read(null, decoder);
-        return record;
     }
 
-    private DatumReader<T> getDatumReader(Schema schema) {
+    private DatumReader<T> getDatumReader(Schema schema, Schema readerSchema) {
         if (useGenericReader) {
-            return new GenericDatumReader<>(schema);
+            return new GenericDatumReader<>(schema, readerSchema);
         } else {
-            return new SpecificDatumReader<>(schema);
+            return new SpecificDatumReader<>(schema, readerSchema);
         }
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index e240683..b6a608f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -18,10 +18,13 @@
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
@@ -62,10 +65,19 @@ public class AvroSerializer<T> implements Serializer<T> {
         if (record == null) {
             return null;
         }
+        String purposeSchema;
+        if (record instanceof GenericRecord) {
+            purposeSchema = ((GenericContainer) record).getSchema().toString();
+        } else {
+            purposeSchema = 
SpecificData.get().getSchema(record.getClass()).toString();
+        }
 
         try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
             BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, 
null);
-            GetSchemaResponse response = getSchemaBySubject(subject);
+            GetSchemaResponse response = 
schemaRegistry.getTargetSchema(subject, purposeSchema);
+            if (response == null) {
+                throw new SerializationException("there's no corresponding 
schema version equals to given schema : " + purposeSchema);
+            }
             long schemaRecordId = response.getRecordId();
             String schemaIdl = response.getIdl();
             Schema schema = new Schema.Parser().parse(schemaIdl);
@@ -80,19 +92,14 @@ public class AvroSerializer<T> implements Serializer<T> {
             }
             datumWriter.write(record, encoder);
             encoder.flush();
-            byte[] bytes = out.toByteArray();
-            return bytes;
+            return out.toByteArray();
         } catch (IOException | RuntimeException e) {
             throw new SerializationException("serialize Avro message failed", 
e);
         } catch (RestClientException e) {
-            throw new SerializationException("get schema by subject failed", 
e);
+            throw new SerializationException("get target schema failed", e);
         }
     }
 
-    private GetSchemaResponse getSchemaBySubject(String subject) throws 
RestClientException, IOException {
-        return schemaRegistry.getSchemaBySubject(subject);
-    }
-
     @Override
     public void close() {
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
index 0028ace..1a03abb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
@@ -16,14 +16,15 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
 
 import java.util.Map;
 
-public class GenericAvroDeserializer<T extends GenericRecord> implements 
Deserializer<T> {
-    private final AvroDeserializer<T> inner;
+public class GenericAvroDeserializer implements Deserializer<GenericRecord> {
+    private final AvroDeserializer<GenericRecord> inner;
 
     public GenericAvroDeserializer() {
         this.inner = new AvroDeserializer<>();
@@ -39,9 +40,13 @@ public class GenericAvroDeserializer<T extends 
GenericRecord> implements Deseria
     }
 
     @Override
-    public T deserialize(String subject, byte[] bytes) {
+    public GenericRecord deserialize(String subject, byte[] bytes) {
         return this.inner.deserialize(subject, bytes);
     }
+    
+    public GenericRecord deserialize(String subject, byte[] bytes, Schema 
readerSchema) {
+        return this.inner.deserialize(subject, bytes, readerSchema);
+    }
 
     @Override
     public void close() {
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
index 240c10d..29c60c1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
@@ -16,30 +16,27 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
-import org.apache.rocketmq.schema.registry.client.serde.Serializer;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 
-public class GenericAvroSerde<T extends GenericRecord> implements Closeable {
-    private final Serializer<T> serializer;
-    private final Deserializer<T> deserializer;
+public class GenericAvroSerde implements Closeable {
+    private final GenericAvroSerializer serializer;
+    private final GenericAvroDeserializer deserializer;
 
     public GenericAvroSerde() {
-        this.serializer = new GenericAvroSerializer<T>();
-        this.deserializer = new GenericAvroDeserializer<T>();
+        this.serializer = new GenericAvroSerializer();
+        this.deserializer = new GenericAvroDeserializer();
     }
 
     public GenericAvroSerde(final SchemaRegistryClient client) {
         if (null == client) {
             throw  new IllegalArgumentException("please initialize the schema 
registry client first");
         }
-        this.serializer = new AvroSerializer<>(client);
-        this.deserializer = new AvroDeserializer<>(client);
+        this.serializer = new GenericAvroSerializer(client);
+        this.deserializer = new GenericAvroDeserializer(client);
     }
 
     public void configure(final Map<String, Object> configs) {
@@ -47,11 +44,11 @@ public class GenericAvroSerde<T extends GenericRecord> 
implements Closeable {
         this.deserializer.configure(configs);
     }
 
-    public Serializer<T> serializer() {
+    public GenericAvroSerializer serializer() {
         return this.serializer;
     }
 
-    public Deserializer<T> deserializer() {
+    public GenericAvroDeserializer deserializer() {
         return this.deserializer;
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
index d178502..6d7da33 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
@@ -22,16 +22,16 @@ import 
org.apache.rocketmq.schema.registry.client.serde.Serializer;
 
 import java.util.Map;
 
-public class GenericAvroSerializer<T extends GenericRecord> implements 
Serializer<T> {
+public class GenericAvroSerializer implements Serializer<GenericRecord> {
 
-    private final AvroSerializer<T> inner;
+    private final AvroSerializer<GenericRecord> inner;
 
     public GenericAvroSerializer() {
-        this.inner = new AvroSerializer<T>();
+        this.inner = new AvroSerializer<>();
     }
 
     public GenericAvroSerializer(final SchemaRegistryClient client) {
-        this.inner = new AvroSerializer<T>(client);
+        this.inner = new AvroSerializer<>(client);
     }
     @Override
     public void configure(final Map<String, Object> configs) {
@@ -39,7 +39,7 @@ public class GenericAvroSerializer<T extends GenericRecord> 
implements Serialize
     }
 
     @Override
-    public byte[] serialize(final String subject, final T record) {
+    public byte[] serialize(final String subject, final GenericRecord record) {
         return this.inner.serialize(subject, record);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
index 768306e..236ed32 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
 
@@ -39,8 +39,8 @@ public class ReflectionAvroDeserializer<T> implements 
Deserializer<T> {
 
     @Override
     public void configure(Map<String, Object> configs) {
-        AvroSerializerConfig avroSerializerConfig = new 
AvroSerializerConfig(configs);
-        this.type = avroSerializerConfig.deserializeTargetType();
+        AvroSerdeConfig avroSerdeConfig = new AvroSerdeConfig(configs);
+        this.type = avroSerdeConfig.deserializeTargetType();
     }
 
     @Override
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
index 192a626..c9ad493 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
+import org.apache.avro.Schema;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -45,6 +46,11 @@ public class SpecificAvroDeserializer<T extends 
SpecificRecord> implements Deser
         return (T) this.inner.deserialize(subject, bytes);
     }
 
+    @SuppressWarnings("unchecked")
+    public T deserialize(String subject, byte[] bytes, Schema readerSchema) {
+        return (T) this.inner.deserialize(subject, bytes, readerSchema);
+    }
+
     @Override
     public void close() {
         this.inner.close();
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
index 2f300c4..797ddd9 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -50,7 +50,7 @@ public class JsonDeserializer<T> implements Deserializer<T> {
 
     @Override
     public void configure(Map<String, Object> configs) {
-        JsonSerializerConfig serializerConfig = new 
JsonSerializerConfig(configs);
+        JsonSerdeConfig serializerConfig = new JsonSerdeConfig(configs);
         this.skipSchemaRegistry = serializerConfig.skipSchemaRegistry();
         this.type = (Class<T>) serializerConfig.deserializeTargetType();
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
index 85e0b4e..930538b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -47,8 +47,8 @@ public class JsonSerializer<T> implements Serializer<T> {
 
     @Override
     public void configure(Map<String, Object> configs) {
-        JsonSerializerConfig jsonSerializerConfig = new 
JsonSerializerConfig(configs);
-        this.skipSchemaRegistry = jsonSerializerConfig.skipSchemaRegistry();
+        JsonSerdeConfig jsonSerdeConfig = new JsonSerdeConfig(configs);
+        this.skipSchemaRegistry = jsonSerdeConfig.skipSchemaRegistry();
     }
 
     @Override
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
index 4a7f22b..bc1cd96 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde;
 
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
 import org.junit.jupiter.api.Test;
 
@@ -33,8 +33,8 @@ public class SkipSchemaRegistrySerdeTest {
 
         try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
             Map<String, Object> configs = new HashMap<>();
-            configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
-            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
+            configs.put(JsonSerdeConfig.SKIP_SCHEMA_REGISTRY, true);
+            configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
             jsonSerde.configure(configs);
             byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
index b25485f..41bca4e 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
@@ -20,7 +20,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.junit.jupiter.api.Test;
@@ -52,7 +52,8 @@ public class GenericAvroSerdeTest {
         when(getSchemaResponse.getIdl()).thenReturn(idl);
 
         registryClient = mock(SchemaRegistryClient.class);
-        
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+        when(registryClient.getTargetSchema("TopicTest", 
schema.toString())).thenReturn(getSchemaResponse);
+        when(registryClient.getSchemaByRecordId("TopicTest", 
1111L)).thenReturn(getSchemaResponse);
 
         GenericRecord record = new GenericRecordBuilder(schema)
                 .set("item", "generic")
@@ -62,7 +63,7 @@ public class GenericAvroSerdeTest {
         try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
             //configure
             Map<String, Object> configs = new HashMap<>();
-            configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
+            configs.put(AvroSerdeConfig.USE_GENERIC_DATUM_READER, true);
             serde.configure(configs);
 
             //serialize
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
index 4864cf5..2716ef5 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import org.apache.rocketmq.schema.registry.client.serde.Charge;
 import org.junit.jupiter.api.Test;
 
@@ -31,7 +31,7 @@ public class ReflectionAvroSerdeTest {
     public void testReflectionAvroSerde() {
         Charge charge = new Charge("specific", 100.0);
         Map<String, Object> configs = new HashMap<>();
-        configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+        configs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
         try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
             //serialize
             serde.configure(configs);
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
index 3424d80..c58ee51 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.schema.registry.client.serde.avro;
 
+import org.apache.avro.Schema;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.serde.Charge;
@@ -26,6 +27,7 @@ import org.mockito.Mock;
 import java.io.IOException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,9 +48,11 @@ public class SpecificAvroSerdeTest {
         when(getSchemaResponse.getRecordId()).thenReturn(11111L);
         
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
         when(getSchemaResponse.getIdl()).thenReturn(idl);
+        Schema schema = new Schema.Parser().parse(idl);
 
         registryClient = mock(SchemaRegistryClient.class);
-        
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+        when(registryClient.getTargetSchema("TopicTest", 
schema.toString())).thenReturn(getSchemaResponse);
+        when(registryClient.getSchemaByRecordId("TopicTest", 
11111L)).thenReturn(getSchemaResponse);
 
         try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
             //serialize
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
index 78d4a0d..12c50c6 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
@@ -17,11 +17,9 @@
 package org.apache.rocketmq.schema.registry.client.serde.json;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.Charge;
 import org.apache.rocketmq.schema.registry.client.serde.Person;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -57,7 +55,7 @@ public class JsonSerdeTest {
             //serialize
             Person person = new Person(1L, "Tom", 18);
             Map<String, Object> configs = new HashMap<>();
-            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
person.getClass());
+            configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, 
person.getClass());
             serde.configure(configs);
             byte[] bytes = serde.serializer().serialize("TopicTest", person);
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
index f3af9be..c2525e0 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
@@ -20,4 +20,5 @@ package org.apache.rocketmq.schema.registry.common.constant;
 public class SchemaConstants {
     public static final int SCHEMA_RECORD_ID_LENGTH = 8;
     public static final char SUBJECT_SEPARATOR = '/';
+    public static final int SCHEMA_VERSION_BITS = 14;
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
similarity index 96%
rename from 
common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
rename to 
common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
index 2c3e873..88967cc 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
@@ -17,5 +17,5 @@
 
 package org.apache.rocketmq.schema.registry.common.model;
 
-public class SchemaDefination {
+public class SchemaDefinition {
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 8dc85a5..45fbcea 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -80,6 +80,10 @@ public interface StorageService<T extends BaseInfo> {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
 
+    default SchemaRecordInfo getTargetSchema(StorageServiceContext context, 
QualifiedName name) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+    }
+
     default List<SchemaRecordInfo> listBySubject(final StorageServiceContext 
context, final QualifiedName name) {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index 8f0457f..8305085 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -106,6 +106,13 @@ public class StorageServiceProxy {
         return storageService.getBySubject(storageServiceContext, name);
     }
 
+    public SchemaRecordInfo getTargetSchema(final QualifiedName name) {
+        final RequestContext requestContext = 
RequestContextManager.getContext();
+        final StorageServiceContext storageServiceContext = 
storageUtil.convertToStorageServiceContext(requestContext);
+        final StorageService<SchemaInfo> storageService = 
storageManager.getStorageService();
+        return storageService.getTargetSchema(storageServiceContext, name);
+    }
+
     public List<SchemaRecordInfo> listBySubject(final QualifiedName name) {
         final RequestContext requestContext = 
RequestContextManager.getContext();
         final StorageServiceContext storageServiceContext = 
storageUtil.convertToStorageServiceContext(requestContext);
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index d69686d..4466aed 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
 import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -411,6 +412,181 @@ public class SchemaController {
         );
     }
 
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = "/subject/{subject-name}/schema/versions/{version}"
+        )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with the given version under the subject")
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+        )
+    public GetSchemaResponse getSchemaBySubject(
+            @ApiParam(value = "The name of the subject", required = true)
+            @PathVariable(value = "subject-name") String subject,
+            @ApiParam(value = "The version of the schema", required = true)
+            @PathVariable(value = "version") String version
+    ) {
+        return getSchemaBySubject(DEFAULT_CLUSTER, DEFAULT_TENANT, subject, 
version);
+    }
+
+    @RequestMapping(
+        method = RequestMethod.POST,
+        path = 
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/schema"
+        )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information from target schema of subject"
+        )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+        )
+    public GetSchemaResponse getTargetSchema(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable(value = "subject-name") String subject,
+        @ApiParam(value = "The schema idl", required = true)
+        @RequestBody final String schema
+    ) {
+        QualifiedName name = new QualifiedName(cluster, tenant, subject, 
schema);
+
+        return this.requestProcessor.processRequest(
+            "getTargetSchema",
+            () -> schemaService.getTargetSchema(name)
+        );
+    }
+
+    @RequestMapping(
+        method = RequestMethod.POST,
+        path = "/subject/{subject-name}/schema/schema"
+        )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information from target schema of subject"
+        )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+        )
+    public GetSchemaResponse getTargetSchema(
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable(value = "subject-name") String subject,
+        @ApiParam(value = "The schema idl", required = true)
+        @RequestBody final String schema
+    ) {
+        QualifiedName name = new QualifiedName("default", "default", subject, 
schema);
+
+        return this.requestProcessor.processRequest(
+            "getTargetSchema",
+            () -> schemaService.getTargetSchema(name)
+        );
+    }
+
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = 
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/recordId/{record-id}/schema"
+        )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with target recordId"
+        )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+        )
+    public GetSchemaResponse getSchemaByRecordId(
+        @ApiParam(value = "The cluster of the subject", required = true)
+        @PathVariable(value = "cluster-name") final String cluster,
+        @ApiParam(value = "The tenant of the schema", required = true)
+        @PathVariable(value = "tenant-name") final String tenant,
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable(value = "subject-name") String subject,
+        @ApiParam(value = "The recordId of the schema", required = true)
+        @PathVariable(value = "record-id") final String recordId
+    ) {
+        long versionMask = ~(-1L << SchemaConstants.SCHEMA_VERSION_BITS);
+        long recordIdResource = Long.parseLong(recordId);
+        Long version = recordIdResource & versionMask;
+        QualifiedName qualifiedName = new QualifiedName(cluster, tenant, 
subject, null, version);
+
+        return this.requestProcessor.processRequest(
+            "getSchemaByRecordId",
+            () -> schemaService.getBySubject(qualifiedName)
+        );
+    }
+
+    @RequestMapping(
+        method = RequestMethod.GET,
+        path = "/subject/{subject-name}/recordId/{record-id}/schema"
+        )
+    @ApiOperation(
+        value = "Schema information",
+        notes = "Schema information with target recordId"
+        )
+    @ApiResponses(
+        {
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_OK,
+                message = "The schema is returned"
+            ),
+            @ApiResponse(
+                code = HttpURLConnection.HTTP_NOT_FOUND,
+                message = "The requested tenant or schema cannot be found"
+            )
+        }
+        )
+    public GetSchemaResponse getSchemaByRecordId(
+        @ApiParam(value = "The name of the subject", required = true)
+        @PathVariable(value = "subject-name") String subject,
+        @ApiParam(value = "The recordId of the schema", required = true)
+        @PathVariable(value = "record-id") final String recordId
+    ) {
+        QualifiedName qualifiedName = new QualifiedName("default", "default", 
subject, null, null);
+
+        return this.requestProcessor.processRequest(
+            "getSchemaByRecordId",
+            () -> schemaService.getByRecordId(qualifiedName, 
Long.parseLong(recordId))
+        );
+    }
+
     @RequestMapping(
         method = RequestMethod.GET,
         path = 
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions"
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index d7af10b..2557ae6 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -83,4 +83,8 @@ public interface SchemaService<T extends BaseDto> {
     List<String> listSubjectsByTenant(QualifiedName qualifiedName);
 
     List<String> listTenants(QualifiedName name);
+
+    GetSchemaResponse getTargetSchema(QualifiedName qualifiedName);
+
+    GetSchemaResponse getByRecordId(QualifiedName qualifiedName, long 
recordId);
 }
diff --git 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 63f74a3..89b7e65 100644
--- 
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++ 
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
 import org.apache.rocketmq.schema.registry.common.context.RequestContext;
 import 
org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
 import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
@@ -299,6 +300,25 @@ public class SchemaServiceImpl implements 
SchemaService<SchemaDto> {
         return tenants;
     }
 
+    public GetSchemaResponse getTargetSchema(QualifiedName qualifiedName) {
+        final RequestContext requestContext = 
RequestContextManager.getContext();
+        log.info("get request context: " + requestContext);
+        this.accessController.checkPermission("", qualifiedName.getTenant(), 
SchemaOperation.GET);
+        SchemaRecordInfo schemaRecordInfo = 
storageServiceProxy.getTargetSchema(qualifiedName);
+        if (schemaRecordInfo == null) {
+            throw new SchemaException("Schema: " + qualifiedName + " not 
exist");
+        }
+        return new GetSchemaResponse(qualifiedName, schemaRecordInfo);
+    }
+
+    @Override
+    public GetSchemaResponse getByRecordId(QualifiedName qualifiedName, long 
recordId) {
+        long versionMask = ~(-1L << SchemaConstants.SCHEMA_VERSION_BITS);
+        Long version = recordId & versionMask;
+        qualifiedName.setVersion(version);
+        return getBySubject(qualifiedName);
+    }
+
     private void checkSchemaExist(final QualifiedName qualifiedName) {
         if (storageServiceProxy.get(qualifiedName) != null) {
             throw new SchemaExistException(qualifiedName);
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
index 0348885..0eb06e0 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
@@ -47,5 +47,11 @@ public class GetSchemaDemo {
         } catch (RestClientException | IOException e) {
             e.printStackTrace();
         }
+        try {
+            List<SchemaRecordDto> schemas = 
schemaRegistryClient.getSchemaListBySubject("default", "default", topic);
+            System.out.println(schemas);
+        } catch (RestClientException | IOException e) {
+            e.printStackTrace();
+        }
     }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
index 69dd52c..a2c9d2e 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
@@ -21,7 +21,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
 
 import java.io.IOException;
@@ -37,14 +37,14 @@ public class GenericAvroSerdeDemo {
 
         Schema schema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
         GenericRecord record = new GenericRecordBuilder(schema)
-                .set("item", "generic")
-                .set("amount", 100.0)
-                .build();
+            .set("item", "generic")
+            .set("amount", 100.0)
+            .build();
 
-        try (GenericAvroSerde<GenericRecord> serde = new 
GenericAvroSerde<>(schemaRegistryClient)) {
+        try (GenericAvroSerde serde = new 
GenericAvroSerde(schemaRegistryClient)) {
             //configure
             Map<String, Object> configs = new HashMap<>();
-            configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
+            configs.put(AvroSerdeConfig.USE_GENERIC_DATUM_READER, true);
             serde.configure(configs);
 
             //serialize
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
index 62b5bd0..816f32f 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.schema.registry.example.serde.avro;
 
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.serde.avro.ReflectionAvroSerde;
 import org.apache.rocketmq.schema.registry.example.serde.Charge;
 
@@ -28,7 +28,7 @@ public class ReflectionAvroSerdeDemo {
     public static void main(String[] args) {
         Charge charge = new Charge("specific", 100.0);
         Map<String, Object> configs = new HashMap<>();
-        configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+        configs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
         try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
             //serialize
             serde.configure(configs);
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
index aaec0ec..c5a29aa 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
@@ -19,7 +19,7 @@ package 
org.apache.rocketmq.schema.registry.example.serde.avro;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
 import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
 import org.apache.rocketmq.schema.registry.example.serde.Charge;
 
@@ -40,8 +40,7 @@ public class SpecificAvroSerdeDemo {
 
             //serialize
             Charge charge = new Charge("specific", 100.0);
-            serializeConfigs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY, 
true);
-            serializeConfigs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+            serializeConfigs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
             serde.configure(serializeConfigs);
             byte[] bytes = serde.serializer().serialize("TopicTest", charge);
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
index bcb4e7a..50364a3 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
@@ -19,7 +19,7 @@ package 
org.apache.rocketmq.schema.registry.example.serde.json;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
 import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -61,7 +61,7 @@ public class JsonSerdeDemo {
 
         try(JsonSerde<Person> jsonSerde = new 
JsonSerde<>(schemaRegistryClient)) {
             Map<String, Object> configs = new HashMap<>();
-            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
+            configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
             jsonSerde.configure(configs);
             byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
 
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
index c7e3500..3bb400b 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
@@ -17,7 +17,7 @@
 
 package org.apache.rocketmq.schema.registry.example.serde.json;
 
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
 import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
 import org.apache.rocketmq.schema.registry.example.serde.Person;
 
@@ -33,8 +33,8 @@ public class JsonSerdeWithoutServerDemo {
 
         try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
             Map<String, Object> configs = new HashMap<>();
-            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
-            configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+            configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
+            configs.put(JsonSerdeConfig.SKIP_SCHEMA_REGISTRY, true);
             jsonSerde.configure(configs);
             byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
 
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 0099025..b71f476 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -75,6 +75,10 @@ public interface RocketmqStorageClient {
         throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
     }
 
+    default SchemaRecordInfo getTargetSchema(QualifiedName qualifiedName) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+    }
+
     /**
      * list all versions of rocketmq schema entity by subject.
      *
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 0ffee01..e728445 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -20,17 +20,21 @@ package 
org.apache.rocketmq.schema.registry.storage.rocketmq;
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
 import 
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
 import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
 import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
 import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
 import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
 
 @Slf4j
@@ -113,6 +117,32 @@ public class RocketmqStorageClientImpl implements 
RocketmqStorageClient {
         return versionSchemaMap.get(qualifiedName.getVersion());
     }
 
+    @Override
+    public SchemaRecordInfo getTargetSchema(QualifiedName qualifiedName) {
+        // schema version is given
+        SchemaInfo schemaInfo = 
rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+        if (schemaInfo == null || schemaInfo.getDetails() == null || 
schemaInfo.getDetails().getSchemaRecords() == null) {
+            return null;
+        }
+        SchemaMetaInfo schemaMetaInfo = schemaInfo.getMeta();
+        if (schemaMetaInfo == null) {
+            return null;
+        }
+        if (schemaMetaInfo.getType() == SchemaType.AVRO) {
+            for (SchemaRecordInfo schemaRecordInfo : 
schemaInfo.getDetails().getSchemaRecords()) {
+                Schema store = new 
Schema.Parser().parse(schemaRecordInfo.getIdl());
+                Schema target = new 
Schema.Parser().parse(qualifiedName.getSchema());
+                if (Objects.equals(store, target)) {
+                    return schemaRecordInfo;
+                }
+            }
+        } else {
+            //todo support other type
+            return null;
+        }
+        return null;
+    }
+
     @Override
     public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
         SchemaInfo schemaInfo = 
rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index 1c91b89..27ebd06 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -79,6 +79,11 @@ public class RocketmqStorageService implements 
StorageService<SchemaInfo> {
         return storageClient.getBySubject(name);
     }
 
+    @Override
+    public SchemaRecordInfo getTargetSchema(StorageServiceContext context, 
QualifiedName name) {
+        return storageClient.getTargetSchema(name);
+    }
+
     @Override
     public List<SchemaRecordInfo> listBySubject(StorageServiceContext context, 
QualifiedName name) {
         return storageClient.listBySubject(name);

Reply via email to