This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 854769e Support ser/de skip schema registry service.[ISSUE-38] (#47)
854769e is described below
commit 854769e690fcc01af404943d8d0045327c00de98
Author: humkum <[email protected]>
AuthorDate: Tue Aug 30 14:32:42 2022 +0800
Support ser/de skip schema registry service.[ISSUE-38] (#47)
* support JSON serde
* support JSON serde
* support ser/de without schema registry server
---
...alizerConfig.java => AvroSerializerConfig.java} | 6 +-
...alizerConfig.java => JsonSerializerConfig.java} | 13 +----
...serializerConfig.java => SerializerConfig.java} | 21 +++----
.../client/serde/avro/AvroDeserializer.java | 4 +-
.../registry/client/serde/avro/AvroSerializer.java | 7 +--
.../serde/avro/ReflectionAvroDeserializer.java | 68 ++++++++++++++++++++++
.../ReflectionAvroSerde.java} | 36 ++++++------
.../serde/avro/ReflectionAvroSerializer.java | 62 ++++++++++++++++++++
.../client/serde/json/JsonDeserializer.java | 42 ++++++++-----
.../registry/client/serde/json/JsonSerde.java | 9 ++-
.../registry/client/serde/json/JsonSerializer.java | 24 ++++++--
.../schema/registry/client/serde/Person.java | 60 +++++++++++++++++++
.../client/serde/SkipSchemaRegistrySerdeTest.java | 48 +++++++++++++++
.../serde/{ => avro}/GenericAvroSerdeTest.java | 9 +--
.../client/serde/avro/ReflectionAvroSerdeTest.java | 38 ++++++------
.../serde/{ => avro}/SpecificAvroSerdeTest.java | 6 +-
.../JsonSerdeTest.java} | 30 +++++-----
.../example/serde/avro/GenericAvroSerdeDemo.java | 4 +-
...SerdeDemo.java => ReflectionAvroSerdeDemo.java} | 29 +++++----
.../example/serde/avro/SpecificAvroSerdeDemo.java | 9 +++
.../registry/example/serde/json/JsonSerdeDemo.java | 10 +++-
.../serde/json/JsonSerdeWithoutServerDemo.java | 48 +++++++++++++++
22 files changed, 451 insertions(+), 132 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
similarity index 89%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
index af897cc..15b32bf 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
@@ -18,14 +18,12 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class AvroDeserializerConfig {
+public class AvroSerializerConfig extends SerializerConfig {
public final static String USE_GENERIC_DATUM_READER =
"use.generic.datum.reader";
public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
- private final Map<String, Object> configs;
-
- public AvroDeserializerConfig(Map<String, Object> configs) {
+ public AvroSerializerConfig(Map<String, Object> configs) {
this.configs = configs;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
similarity index 66%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
index af897cc..56af0c7 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
@@ -18,18 +18,9 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class AvroDeserializerConfig {
- public final static String USE_GENERIC_DATUM_READER =
- "use.generic.datum.reader";
- public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
-
- private final Map<String, Object> configs;
-
- public AvroDeserializerConfig(Map<String, Object> configs) {
+public class JsonSerializerConfig extends SerializerConfig {
+ public JsonSerializerConfig(Map<String, Object> configs) {
this.configs = configs;
}
- public boolean useGenericReader() {
- return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER,
USE_GENERIC_DATUM_READER_DEFAULT);
- }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
similarity index 60%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
index af897cc..87b582b 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
@@ -18,18 +18,19 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class AvroDeserializerConfig {
- public final static String USE_GENERIC_DATUM_READER =
- "use.generic.datum.reader";
- public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
+public class SerializerConfig {
+ public final static String SKIP_SCHEMA_REGISTRY =
+ "skip.schema.registry";
+ public final static boolean SKIP_SCHEMA_REGISTRY_DEFAULT = false;
+ public final static String DESERIALIZE_TARGET_TYPE =
+ "deserialize.target.type";
+ protected Map<String, Object> configs;
- private final Map<String, Object> configs;
-
- public AvroDeserializerConfig(Map<String, Object> configs) {
- this.configs = configs;
+ public boolean skipSchemaRegistry() {
+ return (boolean) configs.getOrDefault(SKIP_SCHEMA_REGISTRY,
SKIP_SCHEMA_REGISTRY_DEFAULT);
}
- public boolean useGenericReader() {
- return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER,
USE_GENERIC_DATUM_READER_DEFAULT);
+ public Class<?> deserializeTargetType() {
+ return (Class) configs.get(DESERIALIZE_TARGET_TYPE);
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index 4daeb6e..87622c0 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -24,7 +24,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -54,7 +54,7 @@ public class AvroDeserializer<T> implements Deserializer<T> {
@Override
public void configure(Map<String, Object> configs) {
- AvroDeserializerConfig config = new AvroDeserializerConfig(configs);
+ AvroSerializerConfig config = new AvroSerializerConfig(configs);
this.useGenericReader = config.useGenericReader();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index 41ae117..e240683 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -49,7 +49,6 @@ public class AvroSerializer<T> implements Serializer<T> {
@Override
public void configure(Map<String, Object> configs) {
- Serializer.super.configure(configs);
}
@Override
@@ -64,13 +63,12 @@ public class AvroSerializer<T> implements Serializer<T> {
return null;
}
- try {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out,
null);
GetSchemaResponse response = getSchemaBySubject(subject);
long schemaRecordId = response.getRecordId();
String schemaIdl = response.getIdl();
Schema schema = new Schema.Parser().parse(schemaIdl);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out,
null);
ByteBuffer buffer =
ByteBuffer.allocate(SchemaConstants.SCHEMA_RECORD_ID_LENGTH);
encoder.writeBytes(buffer.putLong(schemaRecordId).array());
@@ -83,7 +81,6 @@ public class AvroSerializer<T> implements Serializer<T> {
datumWriter.write(record, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
- out.close();
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("serialize Avro message failed",
e);
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
new file mode 100644
index 0000000..768306e
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class ReflectionAvroDeserializer<T> implements Deserializer<T> {
+ private Type type;
+
+ public ReflectionAvroDeserializer() {
+ }
+
+ @Override
+ public void configure(Map<String, Object> configs) {
+ AvroSerializerConfig avroSerializerConfig = new
AvroSerializerConfig(configs);
+ this.type = avroSerializerConfig.deserializeTargetType();
+ }
+
+ @Override
+ public T deserialize(String subject, byte[] bytes) {
+ if (null == bytes) return null;
+
+ if (null == type) {
+ throw new SerializationException("deserialize type can not be
null");
+ }
+ Schema schema = ReflectData.get().getSchema(type);
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais,
null);
+ DatumReader<T> datumReader = new ReflectDatumReader<>(schema);
+ T record = datumReader.read(null, decoder);
+ return record;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
similarity index 60%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
index 52493b1..05bf48f 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerde.java
@@ -14,39 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.client.serde.json;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-public class JsonSerde<T> implements Closeable {
- private final JsonSerializer<T> serializer;
- private final JsonDeserializer<T> deserializer;
-
- public JsonSerde(SchemaRegistryClient registryClient, Class<T> type) {
- this.serializer = new JsonSerializer<>(registryClient);
- this.deserializer = new JsonDeserializer<>(registryClient, type);
- }
+public class ReflectionAvroSerde implements Closeable {
+ private final ReflectionAvroSerializer<SpecificRecord> serializer;
+ private final ReflectionAvroDeserializer<SpecificRecord> deserializer;
- public void configure(final Map<String, Object> configs) {
- this.serializer.configure(configs);
- this.deserializer.configure(configs);
+ public ReflectionAvroSerde() {
+ this.serializer = new ReflectionAvroSerializer<>();
+ this.deserializer = new ReflectionAvroDeserializer<>();
}
- public JsonSerializer<T> serializer() {
+ public Serializer<SpecificRecord> serializer() {
return this.serializer;
}
- public JsonDeserializer<T> deserializer() {
+ public Deserializer<SpecificRecord> deserializer() {
return this.deserializer;
}
+ public void configure(final Map<String, Object> configs) {
+ this.serializer.configure(configs);
+ this.deserializer.configure(configs);
+ }
+
@Override
public void close() throws IOException {
-
+ this.serializer.close();
+ this.deserializer.close();
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
new file mode 100644
index 0000000..202821f
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class ReflectionAvroSerializer<T> implements Serializer<T> {
+ private final EncoderFactory encoderFactory = EncoderFactory.get();
+
+ public ReflectionAvroSerializer() {
+ }
+
+ @Override
+ public void configure(Map<String, Object> configs) {
+
+ }
+
+ @Override
+ public byte[] serialize(String subject, T record) {
+ try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out,
null);
+ Schema schema = ReflectData.get().getSchema(record.getClass());
+
+ DatumWriter<T> datumWriter = new ReflectDatumWriter<>(schema);
+ datumWriter.write(record, encoder);
+ encoder.flush();
+ byte[] bytes = out.toByteArray();
+ return bytes;
+ } catch (IOException | RuntimeException e) {
+ throw new SerializationException("serialize Avro message failed",
e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
index 0569c32..2f300c4 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -35,31 +36,46 @@ import java.util.Map;
public class JsonDeserializer<T> implements Deserializer<T> {
Logger log = LoggerFactory.getLogger(JsonDeserializer.class);
- private final SchemaRegistryClient registryClient;
- private final ObjectMapper objectMapper;
- private final Class<T> type;
+ private SchemaRegistryClient registryClient;
+ private final ObjectMapper objectMapper = JacksonMapper.INSTANCE;
+ private boolean skipSchemaRegistry;
+ private Class<T> type;
- public JsonDeserializer(SchemaRegistryClient registryClient, Class<T>
type) {
+ public JsonDeserializer() {
+ }
+
+ public JsonDeserializer(SchemaRegistryClient registryClient) {
this.registryClient = registryClient;
- objectMapper = JacksonMapper.INSTANCE;
- this.type = type;
}
@Override
public void configure(Map<String, Object> configs) {
-
+ JsonSerializerConfig serializerConfig = new
JsonSerializerConfig(configs);
+ this.skipSchemaRegistry = serializerConfig.skipSchemaRegistry();
+ this.type = (Class<T>) serializerConfig.deserializeTargetType();
}
@Override
public T deserialize(String subject, byte[] payload) {
- if (null == registryClient) {
- throw new SerializationException("please initialize the schema
registry client first");
- }
-
if (null == payload || payload.length == 0) {
return null;
}
+ if (skipSchemaRegistry) {
+ if (null == type) {
+ throw new SerializationException("type cannot be null");
+ }
+ try {
+ return objectMapper.readValue(payload, type);
+ } catch (Exception e) {
+ throw new SerializationException("JSON serialize failed", e);
+ }
+ }
+
+ if (null == registryClient) {
+ throw new SerializationException("please initialize the schema
registry client first");
+ }
+
try {
GetSchemaResponse response =
registryClient.getSchemaBySubject(subject);
ByteBuffer buffer = ByteBuffer.wrap(payload);
@@ -73,9 +89,7 @@ public class JsonDeserializer<T> implements Deserializer<T> {
jsonNode = objectMapper.readValue(buffer.array(), start, length,
JsonNode.class);
return objectMapper.convertValue(jsonNode, type);
- } catch (RestClientException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
+ } catch (RestClientException | IOException e) {
throw new RuntimeException(e);
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
index 52493b1..1c33259 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
@@ -27,9 +27,14 @@ public class JsonSerde<T> implements Closeable {
private final JsonSerializer<T> serializer;
private final JsonDeserializer<T> deserializer;
- public JsonSerde(SchemaRegistryClient registryClient, Class<T> type) {
+ public JsonSerde() {
+ this.serializer = new JsonSerializer<>();
+ this.deserializer = new JsonDeserializer<>();
+ }
+
+ public JsonSerde(SchemaRegistryClient registryClient) {
this.serializer = new JsonSerializer<>(registryClient);
- this.deserializer = new JsonDeserializer<>(registryClient, type);
+ this.deserializer = new JsonDeserializer<>(registryClient);
}
public void configure(final Map<String, Object> configs) {
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
index 5cae08e..85e0b4e 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
@@ -17,9 +17,10 @@
package org.apache.rocketmq.schema.registry.client.serde.json;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.io.EncoderFactory;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -33,18 +34,21 @@ import java.nio.ByteBuffer;
import java.util.Map;
public class JsonSerializer<T> implements Serializer<T> {
- private final SchemaRegistryClient registryClient;
- private final ObjectMapper objectMapper;
- private final EncoderFactory encoderFactory = EncoderFactory.get();
+ private SchemaRegistryClient registryClient;
+ private final ObjectMapper objectMapper = JacksonMapper.INSTANCE;
+ private boolean skipSchemaRegistry;
+
+ public JsonSerializer() {
+ }
public JsonSerializer(SchemaRegistryClient registryClient) {
- this.objectMapper = JacksonMapper.INSTANCE;
this.registryClient = registryClient;
}
@Override
public void configure(Map<String, Object> configs) {
-
+ JsonSerializerConfig jsonSerializerConfig = new
JsonSerializerConfig(configs);
+ this.skipSchemaRegistry = jsonSerializerConfig.skipSchemaRegistry();
}
@Override
@@ -53,6 +57,14 @@ public class JsonSerializer<T> implements Serializer<T> {
return null;
}
+ if (skipSchemaRegistry) {
+ try {
+ return objectMapper.writeValueAsBytes(originMessage);
+ } catch (JsonProcessingException e) {
+ throw new SerializationException("JSON serialize failed", e);
+ }
+ }
+
if (null == registryClient) {
throw new SerializationException("please initialize the schema
registry client first");
}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
new file mode 100644
index 0000000..e20c28b
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Person.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Objects;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@JsonPropertyOrder(alphabetic = true)
+public class Person {
+ @JsonIgnore
+ private Long id;
+ private String name;
+ private int age;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Person person = (Person) o;
+ return age == person.age && Objects.equals(name, person.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, age);
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" +
+ "name='" + name + '\'' +
+ ", age=" + age +
+ '}';
+ }
+}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
new file mode 100644
index 0000000..4a7f22b
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SkipSchemaRegistrySerdeTest {
+
+ @Test
+ public void testJsonSerde() {
+ Person person = new Person(1L, "name", 18);
+ System.out.printf("person before serialize is %s\n", person);
+
+ try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
+ jsonSerde.configure(configs);
+ byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
+
+ Person person1 = jsonSerde.deserializer().deserialize("TopicTest",
bytes);
+ System.out.printf("after deserialize new person is %s\n", person1);
+ System.out.printf("person == person1 : %b\n",
person1.equals(person));
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
+ }
+}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
similarity index 91%
rename from
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
rename to
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
index c8c73e6..4ed8f65 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
@@ -63,7 +62,7 @@ public class GenericAvroSerdeTest {
try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
//configure
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+ configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
serde.configure(configs);
//serialize
@@ -75,7 +74,5 @@ public class GenericAvroSerdeTest {
} catch (IOException e) {
System.out.println("serde shutdown failed");
}
-
-
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
similarity index 58%
copy from
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
copy to
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
index ad9281a..4864cf5 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
@@ -14,34 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.example.serde.avro;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
-import org.apache.rocketmq.schema.registry.example.serde.Charge;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
-
-public class SpecificAvroSerdeDemo {
-
- public static void main(String[] args) {
-
- String baseUrl = "http://localhost:8080";
- SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
-
- try (SpecificAvroSerde serde = new
SpecificAvroSerde(schemaRegistryClient)) {
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ReflectionAvroSerdeTest {
+ @Test
+ public void testReflectionAvroSerde() {
+ Charge charge = new Charge("specific", 100.0);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
//serialize
- Charge charge = new Charge("specific", 100.0);
- byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+ serde.configure(configs);
+ byte[] bytes = serde.serializer().serialize("TopicTest", charge);
//deserialize
Charge charge1 = (Charge)
serde.deserializer().deserialize("TopicTest", bytes);
- System.out.println("the origin object after ser/de is " + charge1);
+ assertThat(charge1).isEqualTo(charge);
} catch (IOException e) {
System.out.println("serde shutdown failed");
}
-
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
similarity index 95%
copy from
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
copy to
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
index 4e34efa..3424d80 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
@@ -61,7 +61,5 @@ public class SpecificAvroSerdeTest {
} catch (IOException e) {
System.out.println("serde shutdown failed");
}
-
-
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
similarity index 69%
rename from
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
rename to
client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
index 4e34efa..78d4a0d 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
@@ -14,23 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serde;
+package org.apache.rocketmq.schema.registry.client.serde.json;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.Charge;
+import org.apache.rocketmq.schema.registry.client.serde.Person;
import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SpecificAvroSerdeTest {
-
+public class JsonSerdeTest {
@Mock
private SchemaRegistryClient registryClient;
@@ -39,29 +43,29 @@ public class SpecificAvroSerdeTest {
@Test
public void testSpecificSerde() throws RestClientException, IOException {
- String idl =
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\","
- +
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+ String idl =
"{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"age\":{\"type\":\"int\"}}";
getSchemaResponse = mock(GetSchemaResponse.class);
when(getSchemaResponse.getRecordId()).thenReturn(11111L);
-
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
+
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Person");
when(getSchemaResponse.getIdl()).thenReturn(idl);
registryClient = mock(SchemaRegistryClient.class);
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
- try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
+ try (JsonSerde<Person> serde = new JsonSerde<Person>(registryClient)) {
//serialize
- Charge charge = new Charge("specific", 100.0);
- byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+ Person person = new Person(1L, "Tom", 18);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
person.getClass());
+ serde.configure(configs);
+ byte[] bytes = serde.serializer().serialize("TopicTest", person);
//deserialize
- Charge charge1 = (Charge)
serde.deserializer().deserialize("TopicTest", bytes);
- assertThat(charge1).isEqualTo(charge);
+ Person person1 = serde.deserializer().deserialize("TopicTest",
bytes);
+ assertThat(person).isEqualTo(person1);
} catch (IOException e) {
System.out.println("serde shutdown failed");
}
-
-
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
index 8e02ec6..641f1c4 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
@@ -21,7 +21,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
import java.io.IOException;
@@ -44,7 +44,7 @@ public class GenericAvroSerdeDemo {
try (GenericAvroSerde serde = new
GenericAvroSerde(schemaRegistryClient)) {
//configure
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+ configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
serde.configure(configs);
//serialize
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
similarity index 63%
copy from
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
copy to
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
index ad9281a..62b5bd0 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
@@ -14,34 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.rocketmq.schema.registry.example.serde.avro;
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import
org.apache.rocketmq.schema.registry.client.serde.avro.ReflectionAvroSerde;
import org.apache.rocketmq.schema.registry.example.serde.Charge;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
-public class SpecificAvroSerdeDemo {
-
+public class ReflectionAvroSerdeDemo {
public static void main(String[] args) {
-
- String baseUrl = "http://localhost:8080";
- SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
-
- try (SpecificAvroSerde serde = new
SpecificAvroSerde(schemaRegistryClient)) {
+ Charge charge = new Charge("specific", 100.0);
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
//serialize
- Charge charge = new Charge("specific", 100.0);
- byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+ serde.configure(configs);
+ byte[] bytes = serde.serializer().serialize("TopicTest", charge);
//deserialize
Charge charge1 = (Charge)
serde.deserializer().deserialize("TopicTest", bytes);
- System.out.println("the origin object after ser/de is " + charge1);
+ System.out.println("charge before ser/de is " + charge);
+ System.out.println("charge after ser/de is " + charge1);
+ System.out.printf("charge == charge1 : %b",
charge.equals(charge1));
} catch (IOException e) {
System.out.println("serde shutdown failed");
}
-
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
index ad9281a..b74af60 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
@@ -19,10 +19,13 @@ package
org.apache.rocketmq.schema.registry.example.serde.avro;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
import org.apache.rocketmq.schema.registry.example.serde.Charge;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public class SpecificAvroSerdeDemo {
@@ -30,10 +33,16 @@ public class SpecificAvroSerdeDemo {
String baseUrl = "http://localhost:8080";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
+ Map<String, Object> serializeConfigs = new HashMap<>();
+
try (SpecificAvroSerde serde = new
SpecificAvroSerde(schemaRegistryClient)) {
+
//serialize
Charge charge = new Charge("specific", 100.0);
+ serializeConfigs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY,
true);
+ serializeConfigs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ serde.configure(serializeConfigs);
byte[] bytes = serde.serializer().serialize("TopicTest", charge);
//deserialize
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
index 7f09de1..bcb4e7a 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
@@ -19,6 +19,7 @@ package
org.apache.rocketmq.schema.registry.example.serde.json;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -28,6 +29,8 @@ import
org.apache.rocketmq.schema.registry.common.model.SchemaType;
import org.apache.rocketmq.schema.registry.example.serde.Person;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public class JsonSerdeDemo {
public static void main(String[] args) {
@@ -44,7 +47,7 @@ public class JsonSerdeDemo {
.owner("test").build();
try {
RegisterSchemaResponse response
- = schemaRegistryClient.registerSchema(topic, "Person",
request);
+ = schemaRegistryClient.registerSchema("default",
"tanant1", topic, "Person", request);
System.out.println("register schema success, schemaId: " +
response.getRecordId());
Thread.sleep(5000);
@@ -56,7 +59,10 @@ public class JsonSerdeDemo {
Person person = new Person(1L, "name", 18);
System.out.printf("person before serialize is %s\n", person);
- try(JsonSerde<Person> jsonSerde = new
JsonSerde<>(schemaRegistryClient, Person.class)) {
+ try(JsonSerde<Person> jsonSerde = new
JsonSerde<>(schemaRegistryClient)) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
+ jsonSerde.configure(configs);
byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
Person person1 = jsonSerde.deserializer().deserialize("TopicTest",
bytes);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
new file mode 100644
index 0000000..c7e3500
--- /dev/null
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.example.serde.json;
+
+import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+import org.apache.rocketmq.schema.registry.example.serde.Person;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonSerdeWithoutServerDemo {
+
+ public static void main(String[] args) {
+ Person person = new Person(1L, "name", 18);
+ System.out.printf("person before serialize is %s\n", person);
+
+ try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
+ configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+ jsonSerde.configure(configs);
+ byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
+
+ Person person1 = jsonSerde.deserializer().deserialize("TopicTest",
bytes);
+ System.out.printf("after deserialize new person is %s\n", person1);
+ System.out.printf("person == person1 : %b\n",
person1.equals(person));
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
+ }
+}