This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new b9a79f7  refine avro serde type to SpecificAvroSerde and 
GernericAvroSerde
     new 77cb8c6  Merge pull request #25 from humkum/main
b9a79f7 is described below

commit b9a79f78d40f2b4c042187302d0833caa437a1db
Author: Hankunming <[email protected]>
AuthorDate: Tue Aug 2 21:00:39 2022 +0800

    refine avro serde type to SpecificAvroSerde and GernericAvroSerde
---
 .../AvroDeserializerConfig.java}                   |  28 +--
 .../client/{serializer => serde}/Deserializer.java |   4 +-
 .../client/{serializer => serde}/Serializer.java   |   4 +-
 .../avro/AvroDeserializer.java}                    |  42 +++-
 .../avro/AvroSerializer.java}                      |  38 ++-
 .../avro/GenericAvroDeserializer.java}             |  25 +-
 .../client/serde/avro/GenericAvroSerde.java        |  63 +++++
 .../avro/GenericAvroSerializer.java}               |  25 +-
 .../avro/SpecificAvroDeserializer.java}            |  26 +-
 .../client/serde/avro/SpecificAvroSerde.java       |  64 +++++
 .../avro/SpecificAvroSerializer.java}              |  29 ++-
 .../schema/registry/client/serde/Charge.java       | 269 +++++++++++++++++++++
 .../client/serde/GenericAvroSerdeTest.java         |  82 +++++++
 .../client/serde/SpecificAvroSerdeTest.java        |  68 ++++++
 .../example/serde/GenericAvroSerdeDemo.java        |  60 +++++
 ...roSerdeDemo.java => SpecificAvroSerdeDemo.java} |  28 ++-
 .../storage/rocketmq/configs/ClientConfig.java     |   6 +-
 .../storage/rocketmq/configs/ServiceConfig.java    |   2 +-
 18 files changed, 771 insertions(+), 92 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
similarity index 57%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
index 406eec7..af897cc 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
@@ -14,30 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.schema.registry.client.serializer;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements 
Deserializer<T> {
-    public AvroDeserializer(){}
+public class AvroDeserializerConfig {
+    public final static String USE_GENERIC_DATUM_READER =
+            "use.generic.datum.reader";
+    public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
 
-    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
-        schemaRegistry = schemaRegistryClient;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-    }
+    private final Map<String, Object> configs;
 
-    @Override
-    public T deserialize(String subject, byte[] bytes) {
-        return (T) deserializeImpl(subject, bytes);
+    public AvroDeserializerConfig(Map<String, Object> configs) {
+        this.configs = configs;
     }
 
-    @Override
-    public void close() {
+    public boolean useGenericReader() {
+        return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER, 
USE_GENERIC_DATUM_READER_DEFAULT);
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
similarity index 89%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
index 81ee7a5..254a299 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde;
 
 import java.io.Closeable;
 import java.util.Map;
 
 public interface Deserializer<T> extends Closeable {
-    default void configure(Map<String, ?> configs) {}
+    default void configure(Map<String, Object> configs) {}
 
     T deserialize(String subject, byte[] bytes);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
similarity index 89%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
index 0b01c75..b02ca2d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde;
 
 import java.io.Closeable;
 import java.util.Map;
 
 public interface Serializer<T> extends Closeable {
 
-    default void configure(Map<String, ?> configs) {}
+    default void configure(Map<String, Object> configs) {}
 
     byte[] serialize(String subject, T originMessage);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
similarity index 69%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index cf24dcb..33287b0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,15 +35,31 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
-public class AbstractAvroDeserializer<T> {
+public class AvroDeserializer<T> implements Deserializer<T> {
 
     Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
 
     private final DecoderFactory decoderFactory = DecoderFactory.get();
     protected SchemaRegistryClient schemaRegistry;
 
-    protected T deserializeImpl(String subject, byte[] payload)
+    private boolean useGenericReader;
+
+    public AvroDeserializer(){}
+
+    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
+        schemaRegistry = schemaRegistryClient;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+        AvroDeserializerConfig config = new AvroDeserializerConfig(configs);
+        this.useGenericReader = config.useGenericReader();
+    }
+
+    @Override
+    public T deserialize(String subject, byte[] payload)
             throws SerializationException {
         if (schemaRegistry == null) {
             throw new SerializationException("please initialize the schema 
registry client first");
@@ -73,9 +92,20 @@ public class AbstractAvroDeserializer<T> {
         long schemaId = buffer.getLong();
         long version = buffer.getLong();
 
-        DatumReader<T> datumReader = new SpecificDatumReader<T>(schema);
-        T originMessage = datumReader.read(null, decoder);
-        return originMessage;
+        DatumReader<T> datumReader = getDatumReader(schema);
+        T record = datumReader.read(null, decoder);
+        return record;
     }
 
+    private DatumReader<T> getDatumReader(Schema schema) {
+        if (useGenericReader) {
+            return new GenericDatumReader<>(schema);
+        } else {
+            return new SpecificDatumReader<>(schema);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
similarity index 74%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index 953a76b..3dffe1e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -15,37 +15,53 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Map;
 
-public class AbstractAvroSerializer<T> {
+public class AvroSerializer<T> implements Serializer<T> {
 
     private static final int SCHEMA_ID_LENGTH = 8;
     private static final int SCHEMA_VERSION_LENGTH = 8;
     protected SchemaRegistryClient schemaRegistry;
     private final EncoderFactory encoderFactory = EncoderFactory.get();
 
-    protected byte[] serializeImpl(
-            String subject, T originMessage)
+    public AvroSerializer() {}
+
+    public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
+        schemaRegistry = schemaRegistryClient;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+        Serializer.super.configure(configs);
+    }
+
+    @Override
+    public byte[] serialize(
+            String subject, T record)
             throws SerializationException {
         if (schemaRegistry == null) {
             throw new SerializationException("please initialize the schema 
registry client first");
         }
 
-        if (originMessage == null) {
+        if (record == null) {
             return null;
         }
 
@@ -60,8 +76,13 @@ public class AbstractAvroSerializer<T> {
             ByteBuffer buffer = ByteBuffer.allocate(SCHEMA_ID_LENGTH + 
SCHEMA_VERSION_LENGTH);
             
encoder.writeBytes(buffer.putLong(schemaId).putLong(schemaVersion).array());
 
-            DatumWriter<T> datumWriter = new SpecificDatumWriter<>(schema);
-            datumWriter.write(originMessage, encoder);
+            DatumWriter<T> datumWriter;
+            if (record instanceof SpecificRecord) {
+                datumWriter = new SpecificDatumWriter<>(schema);
+            } else {
+                datumWriter = new GenericDatumWriter<>(schema);
+            }
+            datumWriter.write(record, encoder);
             encoder.flush();
             byte[] bytes = out.toByteArray();
             out.close();
@@ -77,4 +98,7 @@ public class AbstractAvroSerializer<T> {
         return schemaRegistry.getSchemaBySubject(subject);
     }
 
+    @Override
+    public void close() {
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
similarity index 55%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
index 406eec7..0c58699 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
@@ -14,30 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.generic.GenericRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
 
 import java.util.Map;
 
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements 
Deserializer<T> {
-    public AvroDeserializer(){}
+public class GenericAvroDeserializer implements Deserializer<GenericRecord> {
+    private final AvroDeserializer<GenericRecord> inner;
+
+    public GenericAvroDeserializer() {
+        this.inner = new AvroDeserializer<>();
+    }
 
-    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
-        schemaRegistry = schemaRegistryClient;
+    public GenericAvroDeserializer(final SchemaRegistryClient client) {
+        this.inner = new AvroDeserializer<>(client);
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
+    public void configure(final Map<String, Object> configs) {
+        this.inner.configure(configs);
     }
 
     @Override
-    public T deserialize(String subject, byte[] bytes) {
-        return (T) deserializeImpl(subject, bytes);
+    public GenericRecord deserialize(String subject, byte[] bytes) {
+        return this.inner.deserialize(subject, bytes);
     }
 
     @Override
     public void close() {
+        this.inner.close();
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
new file mode 100644
index 0000000..3abe215
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public class GenericAvroSerde implements Closeable {
+    private final Serializer<GenericRecord> serializer;
+    private final Deserializer<GenericRecord> deserializer;
+
+    public GenericAvroSerde() {
+        this.serializer = new GenericAvroSerializer();
+        this.deserializer = new GenericAvroDeserializer();
+    }
+
+    public GenericAvroSerde(final SchemaRegistryClient client) {
+        if (null == client) {
+            throw  new IllegalArgumentException("please initialize the schema 
registry client first");
+        }
+        this.serializer = new AvroSerializer<>(client);
+        this.deserializer = new AvroDeserializer<>(client);
+    }
+
+    public void configure(final Map<String, Object> configs) {
+        this.serializer.configure(configs);
+        this.deserializer.configure(configs);
+    }
+
+    public Serializer<GenericRecord> serializer() {
+        return this.serializer;
+    }
+
+    public Deserializer<GenericRecord> deserializer() {
+        return this.deserializer;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.serializer.close();
+        this.deserializer.close();
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
similarity index 55%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
index 406eec7..8eb1dbf 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
@@ -14,30 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.generic.GenericRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
 
 import java.util.Map;
 
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements 
Deserializer<T> {
-    public AvroDeserializer(){}
+public class GenericAvroSerializer implements Serializer<GenericRecord> {
+
+    private final AvroSerializer<GenericRecord> inner;
 
-    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
-        schemaRegistry = schemaRegistryClient;
+    public GenericAvroSerializer() {
+        this.inner = new AvroSerializer<GenericRecord>();
     }
 
+    public GenericAvroSerializer(final SchemaRegistryClient client) {
+        this.inner = new AvroSerializer<GenericRecord>(client);
+    }
     @Override
-    public void configure(Map<String, ?> configs) {
+    public void configure(final Map<String, Object> configs) {
+        this.inner.configure(configs);
     }
 
     @Override
-    public T deserialize(String subject, byte[] bytes) {
-        return (T) deserializeImpl(subject, bytes);
+    public byte[] serialize(final String subject, final GenericRecord record) {
+        return this.inner.serialize(subject, record);
     }
 
     @Override
     public void close() {
+        this.inner.close();
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
similarity index 56%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
index 406eec7..f6f90e5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
@@ -14,30 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
 
 import java.util.Map;
 
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements 
Deserializer<T> {
-    public AvroDeserializer(){}
+public class SpecificAvroDeserializer implements Deserializer<SpecificRecord> {
+
+    private final AvroDeserializer<SpecificRecord> inner;
+
+    public SpecificAvroDeserializer() {
+        this.inner = new AvroDeserializer<>();
+    }
 
-    public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
-        schemaRegistry = schemaRegistryClient;
+    public SpecificAvroDeserializer(final SchemaRegistryClient client) {
+        this.inner = new AvroDeserializer<>(client);
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
+    public void configure(final Map<String, Object> configs) {
+        this.inner.configure(configs);
     }
 
     @Override
-    public T deserialize(String subject, byte[] bytes) {
-        return (T) deserializeImpl(subject, bytes);
+    public SpecificRecord deserialize(String subject, byte[] bytes) {
+        return this.inner.deserialize(subject, bytes);
     }
 
     @Override
     public void close() {
+        this.inner.close();
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
new file mode 100644
index 0000000..afaf508
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public class SpecificAvroSerde implements Closeable {
+    private final AvroSerializer<SpecificRecord> serializer;
+    private final AvroDeserializer<SpecificRecord> deserializer;
+
+    public SpecificAvroSerde() {
+        this.serializer = new AvroSerializer<>();
+        this.deserializer = new AvroDeserializer<>();
+    }
+
+    public SpecificAvroSerde(final SchemaRegistryClient client) {
+        if (null == client) {
+            throw new IllegalArgumentException("please initialize schema 
registry client first");
+        }
+
+        this.serializer = new AvroSerializer<>(client);
+        this.deserializer = new AvroDeserializer<>(client);
+    }
+
+    public Serializer<SpecificRecord> serializer() {
+        return this.serializer;
+    }
+
+    public Deserializer<SpecificRecord> deserializer() {
+        return this.deserializer;
+    }
+
+    public void configure(final Map<String, Object> configs) {
+        this.serializer.configure(configs);
+        this.deserializer.configure(configs);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.serializer.close();
+        this.deserializer.close();
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
similarity index 56%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
index 9276e8b..12ed260 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
@@ -14,35 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
 
 import java.util.Map;
 
-public class AvroSerializer<T> extends AbstractAvroSerializer implements 
Serializer<T> {
+public class SpecificAvroSerializer implements Serializer<SpecificRecord> {
+
+    private final AvroSerializer<SpecificRecord> inner;
 
-    public AvroSerializer() {}
+    public SpecificAvroSerializer() {
+        this.inner = new AvroSerializer<>();
+    }
 
-    public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
-        schemaRegistry = schemaRegistryClient;
+    public SpecificAvroSerializer(final SchemaRegistryClient client) {
+        this.inner = new AvroSerializer<>(client);
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
-        Serializer.super.configure(configs);
+    public void configure(final Map<String, Object> configs) {
+        this.inner.configure(configs);
     }
 
     @Override
-    public byte[] serialize(String subject, T originMessage) {
-        if (originMessage == null) {
-            return null;
-        }
-        return serializeImpl(subject, originMessage);
+    public byte[] serialize(String subject, SpecificRecord record) {
+        return this.inner.serialize(subject, record);
     }
 
     @Override
     public void close() {
+        this.inner.close();
     }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
new file mode 100644
index 0000000..aa48960
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+@SuppressWarnings("all")
[email protected]
+public class Charge extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
+  private static final long serialVersionUID = -3449629867777645843L;
+  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public CharSequence item;
+  @Deprecated public double amount;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use <code>newBuilder()</code>. 
+   */
+  public Charge() {}
+
+  /**
+   * All-args constructor.
+   */
+  public Charge(CharSequence item, Double amount) {
+    this.item = item;
+    this.amount = amount;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public Object get(int field$) {
+    switch (field$) {
+    case 0: return item;
+    case 1: return amount;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, Object value$) {
+    switch (field$) {
+    case 0: item = (CharSequence)value$; break;
+    case 1: amount = (Double)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'item' field.
+   */
+  public CharSequence getItem() {
+    return item;
+  }
+
+  /**
+   * Sets the value of the 'item' field.
+   * @param value the value to set.
+   */
+  public void setItem(CharSequence value) {
+    this.item = value;
+  }
+
+  /**
+   * Gets the value of the 'amount' field.
+   */
+  public Double getAmount() {
+    return amount;
+  }
+
+  /**
+   * Sets the value of the 'amount' field.
+   * @param value the value to set.
+   */
+  public void setAmount(Double value) {
+    this.amount = value;
+  }
+
+  /**
+   * Creates a new Charge RecordBuilder.
+   * @return A new Charge RecordBuilder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+  
+  /**
+   * Creates a new Charge RecordBuilder by copying an existing Builder.
+   * @param other The existing builder to copy.
+   * @return A new Charge RecordBuilder
+   */
+  public static Builder newBuilder(Builder other) {
+    return new Builder(other);
+  }
+  
+  /**
+   * Creates a new Charge RecordBuilder by copying an existing Charge instance.
+   * @param other The existing instance to copy.
+   * @return A new Charge RecordBuilder
+   */
+  public static Builder newBuilder(Charge other) {
+    return new Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for Charge instances.
+   */
+  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<Charge>
+    implements org.apache.avro.data.RecordBuilder<Charge> {
+
+    private CharSequence item;
+    private double amount;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(Charge.SCHEMA$);
+    }
+    
+    /**
+     * Creates a Builder by copying an existing Builder.
+     * @param other The existing Builder to copy.
+     */
+    private Builder(Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.item)) {
+        this.item = data().deepCopy(fields()[0].schema(), other.item);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.amount)) {
+        this.amount = data().deepCopy(fields()[1].schema(), other.amount);
+        fieldSetFlags()[1] = true;
+      }
+    }
+    
+    /**
+     * Creates a Builder by copying an existing Charge instance
+     * @param other The existing instance to copy.
+     */
+    private Builder(Charge other) {
+            super(Charge.SCHEMA$);
+      if (isValidValue(fields()[0], other.item)) {
+        this.item = data().deepCopy(fields()[0].schema(), other.item);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.amount)) {
+        this.amount = data().deepCopy(fields()[1].schema(), other.amount);
+        fieldSetFlags()[1] = true;
+      }
+    }
+
+    /**
+      * Gets the value of the 'item' field.
+      * @return The value.
+      */
+    public CharSequence getItem() {
+      return item;
+    }
+
+    /**
+      * Sets the value of the 'item' field.
+      * @param value The value of 'item'.
+      * @return This builder.
+      */
+    public Builder setItem(CharSequence value) {
+      validate(fields()[0], value);
+      this.item = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+
+    /**
+      * Checks whether the 'item' field has been set.
+      * @return True if the 'item' field has been set, false otherwise.
+      */
+    public boolean hasItem() {
+      return fieldSetFlags()[0];
+    }
+
+
+    /**
+      * Clears the value of the 'item' field.
+      * @return This builder.
+      */
+    public Builder clearItem() {
+      item = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /**
+      * Gets the value of the 'amount' field.
+      * @return The value.
+      */
+    public Double getAmount() {
+      return amount;
+    }
+
+    /**
+      * Sets the value of the 'amount' field.
+      * @param value The value of 'amount'.
+      * @return This builder.
+      */
+    public Builder setAmount(double value) {
+      validate(fields()[1], value);
+      this.amount = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+
+    /**
+      * Checks whether the 'amount' field has been set.
+      * @return True if the 'amount' field has been set, false otherwise.
+      */
+    public boolean hasAmount() {
+      return fieldSetFlags()[1];
+    }
+
+
+    /**
+      * Clears the value of the 'amount' field.
+      * @return This builder.
+      */
+    public Builder clearAmount() {
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    @Override
+    public Charge build() {
+      try {
+        Charge record = new Charge();
+        record.item = fieldSetFlags()[0] ? this.item : (CharSequence) 
defaultValue(fields()[0]);
+        record.amount = fieldSetFlags()[1] ? this.amount : (Double) 
defaultValue(fields()[1]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+
+  private static final org.apache.avro.io.DatumWriter
+    WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);  
+
+  @Override public void writeExternal(java.io.ObjectOutput out)
+    throws java.io.IOException {
+    WRITER$.write(this, org.apache.avro.specific.SpecificData.getEncoder(out));
+  }
+
+  private static final org.apache.avro.io.DatumReader
+    READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);  
+
+  @Override public void readExternal(java.io.ObjectInput in)
+    throws java.io.IOException {
+    READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in));
+  }
+
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
new file mode 100644
index 0000000..c841283
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GenericAvroSerdeTest {
+    @Mock
+    private SchemaRegistryClient registryClient;
+
+    @Mock
+    private GetSchemaResponse getSchemaResponse;
+
+    @Test
+    public void testGenericSerde() throws RestClientException, IOException {
+        String idl = 
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+                + 
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+        Schema schema = new Schema.Parser().parse(idl);
+
+        getSchemaResponse = mock(GetSchemaResponse.class);
+        when(getSchemaResponse.getSchemaId()).thenReturn(1111L);
+        when(getSchemaResponse.getVersion()).thenReturn(11111L);
+        when(getSchemaResponse.getIdl()).thenReturn(idl);
+
+        registryClient = mock(SchemaRegistryClient.class);
+        
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+
+        GenericRecord record = new GenericRecordBuilder(schema)
+                .set("item", "generic")
+                .set("amount", 100.0)
+                .build();
+
+        try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
+            //configure
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+            serde.configure(configs);
+
+            //serialize
+            byte[] bytes = serde.serializer().serialize("TopicTest", record);
+
+            //deserialize
+            GenericRecord record1 = 
serde.deserializer().deserialize("TopicTest", bytes);
+            assertThat(record1).isEqualTo(record);
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+
+
+    }
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
new file mode 100644
index 0000000..41c1536
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SpecificAvroSerdeTest {
+
+    @Mock
+    private SchemaRegistryClient registryClient;
+
+    @Mock
+    private GetSchemaResponse getSchemaResponse;
+
+    @Test
+    public void testSpecificSerde() throws RestClientException, IOException {
+        String idl = 
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\","
+                + 
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+
+        getSchemaResponse = mock(GetSchemaResponse.class);
+        when(getSchemaResponse.getSchemaId()).thenReturn(11111L);
+        when(getSchemaResponse.getVersion()).thenReturn(11111L);
+        
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
+        when(getSchemaResponse.getIdl()).thenReturn(idl);
+
+        registryClient = mock(SchemaRegistryClient.class);
+        
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+
+        try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
+            //serialize
+            Charge charge = new Charge("specific", 100.0);
+            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+
+            //deserialize
+            Charge charge1 = (Charge) 
serde.deserializer().deserialize("TopicTest", bytes);
+            assertThat(charge1).isEqualTo(charge);
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+
+
+    }
+}
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
new file mode 100644
index 0000000..b8359a7
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.example.serde;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GenericAvroSerdeDemo {
+
+    public static void main(String[] args) {
+
+        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
+
+        Schema schema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+        GenericRecord record = new GenericRecordBuilder(schema)
+                .set("item", "generic")
+                .set("amount", 100.0)
+                .build();
+
+        try (GenericAvroSerde serde = new 
GenericAvroSerde(schemaRegistryClient)) {
+            //configure
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+            serde.configure(configs);
+
+            //serialize
+            byte[] bytes = serde.serializer().serialize("TopicTest", record);
+
+            //deserialize
+            GenericRecord record1 = 
serde.deserializer().deserialize("TopicTest", bytes);
+            System.out.println("the origin object after ser/de is " + record1);
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
similarity index 60%
rename from 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
rename to 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
index d765a93..281d383 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
@@ -19,26 +19,28 @@ package org.apache.rocketmq.schema.registry.example.serde;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serializer.AvroDeserializer;
-import org.apache.rocketmq.schema.registry.client.serializer.AvroSerializer;
-import org.apache.rocketmq.schema.registry.client.serializer.Deserializer;
-import org.apache.rocketmq.schema.registry.client.serializer.Serializer;
+import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
 
-public class AvroSerdeDemo {
+import java.io.IOException;
+
+public class SpecificAvroSerdeDemo {
 
     public static void main(String[] args) {
 
         String baseUrl = "http://localhost:8080/schema-registry/v1";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
-        //serialize
-        Charge charge = new Charge("fee1", 100.0);
-        Serializer<Charge> serializer = new 
AvroSerializer<>(schemaRegistryClient);
-        byte[] bytes = serializer.serialize("TopicTest", charge);
+        try (SpecificAvroSerde serde = new 
SpecificAvroSerde(schemaRegistryClient)) {
+            //serialize
+            Charge charge = new Charge("specific", 100.0);
+            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+
+            //deserialize
+            Charge charge1 = (Charge) 
serde.deserializer().deserialize("TopicTest", bytes);
+            System.out.println("the origin object after ser/de is " + charge1);
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
 
-        //deserialize
-        Deserializer<Charge> deserializer = new 
AvroDeserializer<>(schemaRegistryClient);
-        Charge charge1 = deserializer.deserialize("TopicTest", bytes);
-        System.out.println("the origin object after ser/de is " + charge1);
     }
 }
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
index 9ca2489..57e3ca0 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
@@ -27,10 +27,10 @@ import org.springframework.context.annotation.Configuration;
 public class ClientConfig {
 
     /**
-     * Talos client instance.
+     * RocketMQ client instance.
      *
-     * @param context connector context
-     * @return MetacatTalosClient
+     * @param context storage plugin context
+     * @return RocketmqStorageClient
      */
     @Bean
     public RocketmqStorageClient rocketmqStorageClient(StoragePluginContext 
context) {
diff --git 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
index de0aa90..3aaea49 100644
--- 
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
+++ 
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
@@ -29,7 +29,7 @@ public class ServiceConfig {
      * create rocketmq storage service.
      *
      * @param storageClient rocketmq storage Client
-     * @return talos connector table Service
+     * @return rocketmq storage Service
      */
     @Bean
     public RocketmqStorageService rocketmqStorageService(RocketmqStorageClient 
storageClient) {

Reply via email to