This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new 854769e  Support ser/de skip schema registry service.[ISSUE-38] (#47)
854769e is described below

commit 854769e690fcc01af404943d8d0045327c00de98
Author: humkum <[email protected]>
AuthorDate: Tue Aug 30 14:32:42 2022 +0800

    Support ser/de skip schema registry service.[ISSUE-38] (#47)
    
    * support JSON serde
    
    * support JSON serde
    
    * support ser/de without schema registry server
---
 ...alizerConfig.java => AvroSerializerConfig.java} |  6 +-
 ...alizerConfig.java => JsonSerializerConfig.java} | 13 +----
 ...serializerConfig.java => SerializerConfig.java} | 21 +++----
 .../client/serde/avro/AvroDeserializer.java        |  4 +-
 .../registry/client/serde/avro/AvroSerializer.java |  7 +--
 .../serde/avro/ReflectionAvroDeserializer.java     | 68 ++++++++++++++++++++++
 .../ReflectionAvroSerde.java}                      | 36 ++++++------
 .../serde/avro/ReflectionAvroSerializer.java       | 62 ++++++++++++++++++++
 .../client/serde/json/JsonDeserializer.java        | 42 ++++++++-----
 .../registry/client/serde/json/JsonSerde.java      |  9 ++-
 .../registry/client/serde/json/JsonSerializer.java | 24 ++++++--
 .../schema/registry/client/serde/Person.java       | 60 +++++++++++++++++++
 .../client/serde/SkipSchemaRegistrySerdeTest.java  | 48 +++++++++++++++
 .../serde/{ => avro}/GenericAvroSerdeTest.java     |  9 +--
 .../client/serde/avro/ReflectionAvroSerdeTest.java | 38 ++++++------
 .../serde/{ => avro}/SpecificAvroSerdeTest.java    |  6 +-
 .../JsonSerdeTest.java}                            | 30 +++++-----
 .../example/serde/avro/GenericAvroSerdeDemo.java   |  4 +-
 ...SerdeDemo.java => ReflectionAvroSerdeDemo.java} | 29 +++++----
 .../example/serde/avro/SpecificAvroSerdeDemo.java  |  9 +++
 .../registry/example/serde/json/JsonSerdeDemo.java | 10 +++-
 .../serde/json/JsonSerdeWithoutServerDemo.java     | 48 +++++++++++++++
 22 files changed, 451 insertions(+), 132 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
similarity index 89%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
index af897cc..15b32bf 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
@@ -18,14 +18,12 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class AvroDeserializerConfig {
+public class AvroSerializerConfig extends SerializerConfig {
     public final static String USE_GENERIC_DATUM_READER =
             "use.generic.datum.reader";
     public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
 
-    private final Map<String, Object> configs;
-
-    public AvroDeserializerConfig(Map<String, Object> configs) {
+    public AvroSerializerConfig(Map<String, Object> configs) {
         this.configs = configs;
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
similarity index 66%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
index af897cc..56af0c7 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
@@ -18,18 +18,9 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class AvroDeserializerConfig {
-    public final static String USE_GENERIC_DATUM_READER =
-            "use.generic.datum.reader";
-    public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
-
-    private final Map<String, Object> configs;
-
-    public AvroDeserializerConfig(Map<String, Object> configs) {
+public class JsonSerializerConfig extends SerializerConfig {
+    public JsonSerializerConfig(Map<String, Object> configs) {
         this.configs = configs;
     }
 
-    public boolean useGenericReader() {
-        return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER, 
USE_GENERIC_DATUM_READER_DEFAULT);
-    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
similarity index 60%
rename from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
rename to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
index af897cc..87b582b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
@@ -18,18 +18,19 @@ package org.apache.rocketmq.schema.registry.client.config;
 
 import java.util.Map;
 
-public class AvroDeserializerConfig {
-    public final static String USE_GENERIC_DATUM_READER =
-            "use.generic.datum.reader";
-    public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
+public class SerializerConfig {
+    public final static String SKIP_SCHEMA_REGISTRY =
+            "skip.schema.registry";
+    public final static boolean SKIP_SCHEMA_REGISTRY_DEFAULT = false;
+    public final static String DESERIALIZE_TARGET_TYPE =
+            "deserialize.target.type";
+    protected Map<String, Object> configs;
 
-    private final Map<String, Object> configs;
-
-    public AvroDeserializerConfig(Map<String, Object> configs) {
-        this.configs = configs;
+    public boolean skipSchemaRegistry() {
+        return (boolean) configs.getOrDefault(SKIP_SCHEMA_REGISTRY, 
SKIP_SCHEMA_REGISTRY_DEFAULT);
     }
 
-    public boolean useGenericReader() {
-        return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER, 
USE_GENERIC_DATUM_READER_DEFAULT);
+    public Class<?> deserializeTargetType() {
+        return (Class) configs.get(DESERIALIZE_TARGET_TYPE);
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index 4daeb6e..87622c0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -24,7 +24,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -54,7 +54,7 @@ public class AvroDeserializer<T> implements Deserializer<T> {
 
     @Override
     public void configure(Map<String, Object> configs) {
-        AvroDeserializerConfig config = new AvroDeserializerConfig(configs);
+        AvroSerializerConfig config = new AvroSerializerConfig(configs);
         this.useGenericReader = config.useGenericReader();
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index 41ae117..e240683 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -49,7 +49,6 @@ public class AvroSerializer<T> implements Serializer<T> {
 
     @Override
     public void configure(Map<String, Object> configs) {
-        Serializer.super.configure(configs);
     }
 
     @Override
@@ -64,13 +63,12 @@ public class AvroSerializer<T> implements Serializer<T> {
             return null;
         }
 
-        try {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, 
null);
             GetSchemaResponse response = getSchemaBySubject(subject);
             long schemaRecordId = response.getRecordId();
             String schemaIdl = response.getIdl();
             Schema schema = new Schema.Parser().parse(schemaIdl);
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, 
null);
             ByteBuffer buffer = 
ByteBuffer.allocate(SchemaConstants.SCHEMA_RECORD_ID_LENGTH);
             encoder.writeBytes(buffer.putLong(schemaRecordId).array());
 
@@ -83,7 +81,6 @@ public class AvroSerializer<T> implements Serializer<T> {
             datumWriter.write(record, encoder);
             encoder.flush();
             byte[] bytes = out.toByteArray();
-            out.close();
             return bytes;
         } catch (IOException | RuntimeException e) {
             throw new SerializationException("serialize Avro message failed", 
e);
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
new file mode 100644
index 0000000..768306e
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class ReflectionAvroDeserializer<T> implements Deserializer<T> {
+    private Type type;
+
+    public ReflectionAvroDeserializer() {
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+        AvroSerializerConfig avroSerializerConfig = new 
AvroSerializerConfig(configs);
+        this.type = avroSerializerConfig.deserializeTargetType();
+    }
+
+    @Override
+    public T deserialize(String subject, byte[] bytes) {
+        if (null == bytes) return null;
+
+        if (null == type) {
+            throw new SerializationException("deserialize type can not be 
null");
+        }
+        Schema schema = ReflectData.get().getSchema(type);
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, 
null);
+            DatumReader<T> datumReader = new ReflectDatumReader<>(schema);
+            T record = datumReader.read(null, decoder);
+            return record;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
similarity index 60%
copy from 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
copy to 
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
index 52493b1..05bf48f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
@@ -14,39 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.client.serde.json;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 
-public class JsonSerde<T> implements Closeable {
-    private final JsonSerializer<T> serializer;
-    private final JsonDeserializer<T> deserializer;
-
-    public JsonSerde(SchemaRegistryClient registryClient, Class<T> type) {
-        this.serializer = new JsonSerializer<>(registryClient);
-        this.deserializer = new JsonDeserializer<>(registryClient, type);
-    }
+public class ReflectionAvroSerde implements Closeable {
+    private final ReflectionAvroSerializer<SpecificRecord> serializer;
+    private final ReflectionAvroDeserializer<SpecificRecord> deserializer;
 
-    public void configure(final Map<String, Object> configs) {
-        this.serializer.configure(configs);
-        this.deserializer.configure(configs);
+    public ReflectionAvroSerde() {
+        this.serializer = new ReflectionAvroSerializer<>();
+        this.deserializer = new ReflectionAvroDeserializer<>();
     }
 
-    public JsonSerializer<T> serializer() {
+    public Serializer<SpecificRecord> serializer() {
         return this.serializer;
     }
 
-    public JsonDeserializer<T> deserializer() {
+    public Deserializer<SpecificRecord> deserializer() {
         return this.deserializer;
     }
 
+    public void configure(final Map<String, Object> configs) {
+        this.serializer.configure(configs);
+        this.deserializer.configure(configs);
+    }
+
     @Override
     public void close() throws IOException {
-
+        this.serializer.close();
+        this.deserializer.close();
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
new file mode 100644
index 0000000..202821f
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class ReflectionAvroSerializer<T> implements Serializer<T> {
+    private final EncoderFactory encoderFactory = EncoderFactory.get();
+
+    public ReflectionAvroSerializer() {
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+
+    }
+
+    @Override
+    public byte[] serialize(String subject, T record) {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, 
null);
+            Schema schema = ReflectData.get().getSchema(record.getClass());
+
+            DatumWriter<T> datumWriter = new ReflectDatumWriter<>(schema);
+            datumWriter.write(record, encoder);
+            encoder.flush();
+            byte[] bytes = out.toByteArray();
+            return bytes;
+        } catch (IOException | RuntimeException e) {
+            throw new SerializationException("serialize Avro message failed", 
e);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
index 0569c32..2f300c4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -35,31 +36,46 @@ import java.util.Map;
 
 public class JsonDeserializer<T> implements Deserializer<T> {
     Logger log = LoggerFactory.getLogger(JsonDeserializer.class);
-    private final SchemaRegistryClient registryClient;
-    private final ObjectMapper objectMapper;
-    private final Class<T> type;
+    private SchemaRegistryClient registryClient;
+    private final ObjectMapper objectMapper = JacksonMapper.INSTANCE;
+    private boolean skipSchemaRegistry;
+    private Class<T> type;
 
-    public JsonDeserializer(SchemaRegistryClient registryClient, Class<T> 
type) {
+    public JsonDeserializer() {
+    }
+
+    public JsonDeserializer(SchemaRegistryClient registryClient) {
         this.registryClient = registryClient;
-        objectMapper = JacksonMapper.INSTANCE;
-        this.type = type;
     }
 
     @Override
     public void configure(Map<String, Object> configs) {
-
+        JsonSerializerConfig serializerConfig = new 
JsonSerializerConfig(configs);
+        this.skipSchemaRegistry = serializerConfig.skipSchemaRegistry();
+        this.type = (Class<T>) serializerConfig.deserializeTargetType();
     }
 
     @Override
     public T deserialize(String subject, byte[] payload) {
-        if (null == registryClient) {
-            throw new SerializationException("please initialize the schema 
registry client first");
-        }
-
         if (null == payload || payload.length == 0) {
             return null;
         }
 
+        if (skipSchemaRegistry) {
+            if (null == type) {
+                throw new SerializationException("type cannot be null");
+            }
+            try {
+                return objectMapper.readValue(payload, type);
+            } catch (Exception e) {
+                throw new SerializationException("JSON serialize failed", e);
+            }
+        }
+
+        if (null == registryClient) {
+            throw new SerializationException("please initialize the schema 
registry client first");
+        }
+
         try {
             GetSchemaResponse response = 
registryClient.getSchemaBySubject(subject);
             ByteBuffer buffer = ByteBuffer.wrap(payload);
@@ -73,9 +89,7 @@ public class JsonDeserializer<T> implements Deserializer<T> {
             jsonNode = objectMapper.readValue(buffer.array(), start, length, 
JsonNode.class);
 
             return objectMapper.convertValue(jsonNode, type);
-        } catch (RestClientException e) {
-            throw new RuntimeException(e);
-        } catch (IOException e) {
+        } catch (RestClientException | IOException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
index 52493b1..1c33259 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
@@ -27,9 +27,14 @@ public class JsonSerde<T> implements Closeable {
     private final JsonSerializer<T> serializer;
     private final JsonDeserializer<T> deserializer;
 
-    public JsonSerde(SchemaRegistryClient registryClient, Class<T> type) {
+    public JsonSerde() {
+        this.serializer = new JsonSerializer<>();
+        this.deserializer = new JsonDeserializer<>();
+    }
+
+    public JsonSerde(SchemaRegistryClient registryClient) {
         this.serializer = new JsonSerializer<>(registryClient);
-        this.deserializer = new JsonDeserializer<>(registryClient, type);
+        this.deserializer = new JsonDeserializer<>(registryClient);
     }
 
     public void configure(final Map<String, Object> configs) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
index 5cae08e..85e0b4e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
@@ -17,9 +17,10 @@
 
 package org.apache.rocketmq.schema.registry.client.serde.json;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.io.EncoderFactory;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -33,18 +34,21 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 
 public class JsonSerializer<T> implements Serializer<T> {
-    private final SchemaRegistryClient registryClient;
-    private final ObjectMapper objectMapper;
-    private final EncoderFactory encoderFactory = EncoderFactory.get();
+    private SchemaRegistryClient registryClient;
+    private final ObjectMapper objectMapper = JacksonMapper.INSTANCE;
+    private boolean skipSchemaRegistry;
+
+    public JsonSerializer() {
+    }
 
     public JsonSerializer(SchemaRegistryClient registryClient) {
-        this.objectMapper = JacksonMapper.INSTANCE;
         this.registryClient = registryClient;
     }
 
     @Override
     public void configure(Map<String, Object> configs) {
-
+        JsonSerializerConfig jsonSerializerConfig = new 
JsonSerializerConfig(configs);
+        this.skipSchemaRegistry = jsonSerializerConfig.skipSchemaRegistry();
     }
 
     @Override
@@ -53,6 +57,14 @@ public class JsonSerializer<T> implements Serializer<T> {
             return null;
         }
 
+        if (skipSchemaRegistry) {
+            try {
+                return objectMapper.writeValueAsBytes(originMessage);
+            } catch (JsonProcessingException e) {
+                throw new SerializationException("JSON serialize failed", e);
+            }
+        }
+
         if (null == registryClient) {
             throw new SerializationException("please initialize the schema 
registry client first");
         }
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
new file mode 100644
index 0000000..e20c28b
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Objects;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@JsonPropertyOrder(alphabetic = true)
+public class Person {
+    @JsonIgnore
+    private Long id;
+    private String name;
+    private int age;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Person person = (Person) o;
+        return age == person.age && Objects.equals(name, person.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, age);
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" +
+                "name='" + name + '\'' +
+                ", age=" + age +
+                '}';
+    }
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
new file mode 100644
index 0000000..4a7f22b
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SkipSchemaRegistrySerdeTest {
+
+    @Test
+    public void testJsonSerde() {
+        Person person = new Person(1L, "name", 18);
+        System.out.printf("person before serialize is %s\n", person);
+
+        try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
+            jsonSerde.configure(configs);
+            byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
+
+            Person person1 = jsonSerde.deserializer().deserialize("TopicTest", 
bytes);
+            System.out.printf("after deserialize new person is %s\n", person1);
+            System.out.printf("person == person1 : %b\n", 
person1.equals(person));
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+    }
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
similarity index 91%
rename from 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
rename to 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
index c8c73e6..4ed8f65 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -63,7 +62,7 @@ public class GenericAvroSerdeTest {
         try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
             //configure
             Map<String, Object> configs = new HashMap<>();
-            configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+            configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
             serde.configure(configs);
 
             //serialize
@@ -75,7 +74,5 @@ public class GenericAvroSerdeTest {
         } catch (IOException e) {
             System.out.println("serde shutdown failed");
         }
-
-
     }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
similarity index 58%
copy from 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
copy to 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
index ad9281a..4864cf5 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
@@ -14,34 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
-package org.apache.rocketmq.schema.registry.example.serde.avro;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
-import org.apache.rocketmq.schema.registry.example.serde.Charge;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-
-public class SpecificAvroSerdeDemo {
-
-    public static void main(String[] args) {
-
-        String baseUrl = "http://localhost:8080";;
-        SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
-
-        try (SpecificAvroSerde serde = new 
SpecificAvroSerde(schemaRegistryClient)) {
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ReflectionAvroSerdeTest {
+    @Test
+    public void testReflectionAvroSerde() {
+        Charge charge = new Charge("specific", 100.0);
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+        try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
             //serialize
-            Charge charge = new Charge("specific", 100.0);
-            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+            serde.configure(configs);
 
+            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
             //deserialize
             Charge charge1 = (Charge) 
serde.deserializer().deserialize("TopicTest", bytes);
-            System.out.println("the origin object after ser/de is " + charge1);
+            assertThat(charge1).isEqualTo(charge);
         } catch (IOException e) {
             System.out.println("serde shutdown failed");
         }
-
     }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
similarity index 95%
copy from 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
copy to 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
index 4e34efa..3424d80 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -61,7 +61,5 @@ public class SpecificAvroSerdeTest {
         } catch (IOException e) {
             System.out.println("serde shutdown failed");
         }
-
-
     }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
similarity index 69%
rename from 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
rename to 
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
index 4e34efa..78d4a0d 100644
--- 
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
@@ -14,23 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.json;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
+import org.apache.rocketmq.schema.registry.client.serde.Person;
 import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class SpecificAvroSerdeTest {
-
+public class JsonSerdeTest {
     @Mock
     private SchemaRegistryClient registryClient;
 
@@ -39,29 +43,29 @@ public class SpecificAvroSerdeTest {
 
     @Test
     public void testSpecificSerde() throws RestClientException, IOException {
-        String idl = 
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\","
-                + 
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+        String idl = 
"{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"age\":{\"type\":\"int\"}}";
 
         getSchemaResponse = mock(GetSchemaResponse.class);
         when(getSchemaResponse.getRecordId()).thenReturn(11111L);
-        
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
+        
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Person");
         when(getSchemaResponse.getIdl()).thenReturn(idl);
 
         registryClient = mock(SchemaRegistryClient.class);
         
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
 
-        try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
+        try (JsonSerde<Person> serde = new JsonSerde<Person>(registryClient)) {
             //serialize
-            Charge charge = new Charge("specific", 100.0);
-            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+            Person person = new Person(1L, "Tom", 18);
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
person.getClass());
+            serde.configure(configs);
+            byte[] bytes = serde.serializer().serialize("TopicTest", person);
 
             //deserialize
-            Charge charge1 = (Charge) 
serde.deserializer().deserialize("TopicTest", bytes);
-            assertThat(charge1).isEqualTo(charge);
+            Person person1 = serde.deserializer().deserialize("TopicTest", 
bytes);
+            assertThat(person).isEqualTo(person1);
         } catch (IOException e) {
             System.out.println("serde shutdown failed");
         }
-
-
     }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
index 8e02ec6..641f1c4 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
@@ -21,7 +21,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import 
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
 import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
 
 import java.io.IOException;
@@ -44,7 +44,7 @@ public class GenericAvroSerdeDemo {
         try (GenericAvroSerde serde = new 
GenericAvroSerde(schemaRegistryClient)) {
             //configure
             Map<String, Object> configs = new HashMap<>();
-            configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+            configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
             serde.configure(configs);
 
             //serialize
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
similarity index 63%
copy from 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
copy to 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
index ad9281a..62b5bd0 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
@@ -14,34 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.rocketmq.schema.registry.example.serde.avro;
 
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import 
org.apache.rocketmq.schema.registry.client.serde.avro.ReflectionAvroSerde;
 import org.apache.rocketmq.schema.registry.example.serde.Charge;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
-public class SpecificAvroSerdeDemo {
-
+public class ReflectionAvroSerdeDemo {
     public static void main(String[] args) {
-
-        String baseUrl = "http://localhost:8080";;
-        SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
-
-        try (SpecificAvroSerde serde = new 
SpecificAvroSerde(schemaRegistryClient)) {
+        Charge charge = new Charge("specific", 100.0);
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+        try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
             //serialize
-            Charge charge = new Charge("specific", 100.0);
-            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+            serde.configure(configs);
 
+            byte[] bytes = serde.serializer().serialize("TopicTest", charge);
             //deserialize
             Charge charge1 = (Charge) 
serde.deserializer().deserialize("TopicTest", bytes);
-            System.out.println("the origin object after ser/de is " + charge1);
+            System.out.println("charge before ser/de is " + charge);
+            System.out.println("charge after ser/de is " + charge1);
+            System.out.printf("charge == charge1 : %b", 
charge.equals(charge1));
         } catch (IOException e) {
             System.out.println("serde shutdown failed");
         }
-
     }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
index ad9281a..b74af60 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
@@ -19,10 +19,13 @@ package 
org.apache.rocketmq.schema.registry.example.serde.avro;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
 import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
 import org.apache.rocketmq.schema.registry.example.serde.Charge;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SpecificAvroSerdeDemo {
 
@@ -30,10 +33,16 @@ public class SpecificAvroSerdeDemo {
 
         String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
+        Map<String, Object> serializeConfigs = new HashMap<>();
+
 
         try (SpecificAvroSerde serde = new 
SpecificAvroSerde(schemaRegistryClient)) {
+
             //serialize
             Charge charge = new Charge("specific", 100.0);
+            serializeConfigs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY, 
true);
+            serializeConfigs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, 
charge.getClass());
+            serde.configure(serializeConfigs);
             byte[] bytes = serde.serializer().serialize("TopicTest", charge);
 
             //deserialize
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
index 7f09de1..bcb4e7a 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
@@ -19,6 +19,7 @@ package 
org.apache.rocketmq.schema.registry.example.serde.json;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
 import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -28,6 +29,8 @@ import 
org.apache.rocketmq.schema.registry.common.model.SchemaType;
 import org.apache.rocketmq.schema.registry.example.serde.Person;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class JsonSerdeDemo {
     public static void main(String[] args) {
@@ -44,7 +47,7 @@ public class JsonSerdeDemo {
                 .owner("test").build();
         try {
             RegisterSchemaResponse response
-                    = schemaRegistryClient.registerSchema(topic, "Person", 
request);
+                    = schemaRegistryClient.registerSchema("default", 
"tanant1", topic, "Person", request);
             System.out.println("register schema success, schemaId: " + 
response.getRecordId());
 
             Thread.sleep(5000);
@@ -56,7 +59,10 @@ public class JsonSerdeDemo {
         Person person = new Person(1L, "name", 18);
         System.out.printf("person before serialize is %s\n", person);
 
-        try(JsonSerde<Person> jsonSerde = new 
JsonSerde<>(schemaRegistryClient, Person.class)) {
+        try(JsonSerde<Person> jsonSerde = new 
JsonSerde<>(schemaRegistryClient)) {
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
+            jsonSerde.configure(configs);
             byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
 
             Person person1 = jsonSerde.deserializer().deserialize("TopicTest", 
bytes);
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
new file mode 100644
index 0000000..c7e3500
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.example.serde.json;
+
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+import org.apache.rocketmq.schema.registry.example.serde.Person;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonSerdeWithoutServerDemo {
+
+    public static void main(String[] args) {
+        Person person = new Person(1L, "name", 18);
+        System.out.printf("person before serialize is %s\n", person);
+
+        try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, 
Person.class);
+            configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+            jsonSerde.configure(configs);
+            byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
+
+            Person person1 = jsonSerde.deserializer().deserialize("TopicTest", 
bytes);
+            System.out.printf("after deserialize new person is %s\n", person1);
+            System.out.printf("person == person1 : %b\n", 
person1.equals(person));
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+    }
+}

Reply via email to