This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git


The following commit(s) were added to refs/heads/main by this push:
     new d719428  support JSON serde (#41)
d719428 is described below

commit d71942859c6ede31f929ed60d6c6df218c6996f7
Author: humkum <[email protected]>
AuthorDate: Mon Aug 22 19:15:13 2022 +0800

    support JSON serde (#41)
    
    * support JSON serde
    
    * support JSON serde
---
 .../registry/client/serde/avro/AvroSerializer.java |  5 +-
 .../client/serde/json/JsonDeserializer.java        | 87 ++++++++++++++++++++++
 .../registry/client/serde/json/JsonSerde.java      | 52 +++++++++++++
 .../registry/client/serde/json/JsonSerializer.java | 81 ++++++++++++++++++++
 .../schema/registry/common/QualifiedName.java      | 10 ++-
 .../registry/common/constant/SchemaConstants.java  | 23 ++++++
 .../schema/registry/common/model/SubjectInfo.java  |  4 +-
 .../schema/registry/example/serde/Person.java      | 45 ++++++-----
 .../serde/{ => avro}/GenericAvroSerdeDemo.java     |  4 +-
 .../serde/{ => avro}/SpecificAvroSerdeDemo.java    |  5 +-
 .../registry/example/serde/json/JsonSerdeDemo.java | 69 +++++++++++++++++
 11 files changed, 354 insertions(+), 31 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index e883e09..41ae117 100644
--- 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -28,6 +28,7 @@ import 
org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
 import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
 import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
 import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
 
 import java.io.ByteArrayOutputStream;
@@ -37,8 +38,6 @@ import java.util.Map;
 
 public class AvroSerializer<T> implements Serializer<T> {
 
-    private static final int SCHEMA_ID_LENGTH = 8;
-    private static final int SCHEMA_VERSION_LENGTH = 8;
     protected SchemaRegistryClient schemaRegistry;
     private final EncoderFactory encoderFactory = EncoderFactory.get();
 
@@ -72,7 +71,7 @@ public class AvroSerializer<T> implements Serializer<T> {
             Schema schema = new Schema.Parser().parse(schemaIdl);
             ByteArrayOutputStream out = new ByteArrayOutputStream();
             BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, 
null);
-            ByteBuffer buffer = ByteBuffer.allocate(SCHEMA_ID_LENGTH + 
SCHEMA_VERSION_LENGTH);
+            ByteBuffer buffer = 
ByteBuffer.allocate(SchemaConstants.SCHEMA_RECORD_ID_LENGTH);
             encoder.writeBytes(buffer.putLong(schemaRecordId).array());
 
             DatumWriter<T> datumWriter;
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
new file mode 100644
index 0000000..0569c32
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serde.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class JsonDeserializer<T> implements Deserializer<T> {
+    Logger log = LoggerFactory.getLogger(JsonDeserializer.class);
+    private final SchemaRegistryClient registryClient;
+    private final ObjectMapper objectMapper;
+    private final Class<T> type;
+
+    public JsonDeserializer(SchemaRegistryClient registryClient, Class<T> 
type) {
+        this.registryClient = registryClient;
+        objectMapper = JacksonMapper.INSTANCE;
+        this.type = type;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+
+    }
+
+    @Override
+    public T deserialize(String subject, byte[] payload) {
+        if (null == registryClient) {
+            throw new SerializationException("please initialize the schema 
registry client first");
+        }
+
+        if (null == payload || payload.length == 0) {
+            return null;
+        }
+
+        try {
+            GetSchemaResponse response = 
registryClient.getSchemaBySubject(subject);
+            ByteBuffer buffer = ByteBuffer.wrap(payload);
+
+            long schemaRecordId = buffer.getLong();
+
+            int length = buffer.limit() - 
SchemaConstants.SCHEMA_RECORD_ID_LENGTH;
+            int start = buffer.position() + buffer.arrayOffset();
+
+            JsonNode jsonNode = null;
+            jsonNode = objectMapper.readValue(buffer.array(), start, length, 
JsonNode.class);
+
+            return objectMapper.convertValue(jsonNode, type);
+        } catch (RestClientException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
new file mode 100644
index 0000000..52493b1
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerde.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serde.json;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public class JsonSerde<T> implements Closeable {
+    private final JsonSerializer<T> serializer;
+    private final JsonDeserializer<T> deserializer;
+
+    public JsonSerde(SchemaRegistryClient registryClient, Class<T> type) {
+        this.serializer = new JsonSerializer<>(registryClient);
+        this.deserializer = new JsonDeserializer<>(registryClient, type);
+    }
+
+    public void configure(final Map<String, Object> configs) {
+        this.serializer.configure(configs);
+        this.deserializer.configure(configs);
+    }
+
+    public JsonSerializer<T> serializer() {
+        return this.serializer;
+    }
+
+    public JsonDeserializer<T> deserializer() {
+        return this.deserializer;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
new file mode 100644
index 0000000..5cae08e
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.serde.json;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class JsonSerializer<T> implements Serializer<T> {
+    private final SchemaRegistryClient registryClient;
+    private final ObjectMapper objectMapper;
+    private final EncoderFactory encoderFactory = EncoderFactory.get();
+
+    public JsonSerializer(SchemaRegistryClient registryClient) {
+        this.objectMapper = JacksonMapper.INSTANCE;
+        this.registryClient = registryClient;
+    }
+
+    @Override
+    public void configure(Map<String, Object> configs) {
+
+    }
+
+    @Override
+    public byte[] serialize(String subject, T originMessage) {
+        if (null == originMessage) {
+            return null;
+        }
+
+        if (null == registryClient) {
+            throw new SerializationException("please initialize the schema 
registry client first");
+        }
+
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            GetSchemaResponse response = 
registryClient.getSchemaBySubject(subject);
+            long schemaRecordId = response.getRecordId();
+            ByteBuffer buffer = 
ByteBuffer.allocate(SchemaConstants.SCHEMA_RECORD_ID_LENGTH);
+            out.write(buffer.putLong(schemaRecordId).array());
+            out.write(objectMapper.writeValueAsBytes(originMessage));
+
+            byte[] bytes = out.toByteArray();
+            return bytes;
+        } catch (IOException | RuntimeException e) {
+            throw new SerializationException("JSON serialize failed", e);
+        } catch (RestClientException e) {
+            throw new SerializationException("get schema by subject failed", 
e);
+        }
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
index 87cacee..bf7ece0 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/QualifiedName.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
 import org.apache.rocketmq.schema.registry.common.model.SubjectInfo;
 
 @Data
@@ -67,15 +68,18 @@ public class QualifiedName implements Serializable {
     }
 
     public String fullName() {
-        return cluster + '/' + tenant + '/' + subject + '/' + schema;
+        return cluster + SchemaConstants.SUBJECT_SEPARATOR + tenant
+                + SchemaConstants.SUBJECT_SEPARATOR + subject
+                + SchemaConstants.SUBJECT_SEPARATOR + schema;
     }
 
     public String schemaFullName() {
-        return tenant + '/' + schema;
+        return tenant + SchemaConstants.SUBJECT_SEPARATOR + schema;
     }
 
     public String subjectFullName() {
-        return cluster + '/' + tenant + '/' + subject;
+        return cluster + SchemaConstants.SUBJECT_SEPARATOR + tenant
+                + SchemaConstants.SUBJECT_SEPARATOR + subject;
     }
 
     @Override
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
new file mode 100644
index 0000000..f3af9be
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.common.constant;
+
+public class SchemaConstants {
+    public static final int SCHEMA_RECORD_ID_LENGTH = 8;
+    public static final char SUBJECT_SEPARATOR = '/';
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
index cc61275..baf65dd 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
+++ 
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
@@ -23,6 +23,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
 
 @Data
 @EqualsAndHashCode(callSuper = false)
@@ -37,7 +38,8 @@ public class SubjectInfo implements Serializable {
     private String subject;
 
     public String fullName() {
-        return cluster + '/' + tenant + '/' + subject;
+        return cluster + SchemaConstants.SUBJECT_SEPARATOR + tenant
+                + SchemaConstants.SUBJECT_SEPARATOR + subject;
     }
 
     @Override
diff --git 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/Person.java
similarity index 55%
copy from 
common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
copy to 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/Person.java
index cc61275..b2d1fd8 100644
--- 
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SubjectInfo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/Person.java
@@ -15,41 +15,46 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.common.model;
+package org.apache.rocketmq.schema.registry.example.serde;
 
-import java.io.Serializable;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import java.util.Objects;
+
 @Data
-@EqualsAndHashCode(callSuper = false)
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
-public class SubjectInfo implements Serializable {
-    private static final long serialVersionUID = -92808722007777844L;
+@JsonPropertyOrder(alphabetic = true)
+public class Person {
+    @JsonIgnore
+    private Long id;
+    private String name;
+    private int age;
 
-    private String cluster;
-    private String tenant;
-    private String subject;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Person person = (Person) o;
+        return age == person.age && Objects.equals(name, person.name);
+    }
 
-    public String fullName() {
-        return cluster + '/' + tenant + '/' + subject;
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, age);
     }
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("{");
-        sb.append("\"cluster\":\"")
-            .append(cluster).append('\"');
-        sb.append("\"tenant\":\"")
-            .append(tenant).append('\"');
-        sb.append(",\"subject\":\"")
-            .append(subject).append('\"');
-        sb.append('}');
-        return sb.toString();
+        return "Person{" +
+                "name='" + name + '\'' +
+                ", age=" + age +
+                '}';
     }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
similarity index 95%
rename from 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
rename to 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
index b8359a7..8e02ec6 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.schema.registry.example.serde;
+package org.apache.rocketmq.schema.registry.example.serde.avro;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -32,7 +32,7 @@ public class GenericAvroSerdeDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         Schema schema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
similarity index 91%
rename from 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
rename to 
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
index 281d383..ad9281a 100644
--- 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.schema.registry.example.serde;
+package org.apache.rocketmq.schema.registry.example.serde.avro;
 
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
 import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
 import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.example.serde.Charge;
 
 import java.io.IOException;
 
@@ -27,7 +28,7 @@ public class SpecificAvroSerdeDemo {
 
     public static void main(String[] args) {
 
-        String baseUrl = "http://localhost:8080/schema-registry/v1";;
+        String baseUrl = "http://localhost:8080";;
         SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
 
         try (SpecificAvroSerde serde = new 
SpecificAvroSerde(schemaRegistryClient)) {
diff --git 
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
new file mode 100644
index 0000000..7f09de1
--- /dev/null
+++ 
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.example.serde.json;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import 
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
+import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.Compatibility;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.schema.registry.example.serde.Person;
+
+import java.io.IOException;
+
+public class JsonSerdeDemo {
+    public static void main(String[] args) {
+
+        String baseUrl = "http://localhost:8080";;
+        SchemaRegistryClient schemaRegistryClient = 
SchemaRegistryClientFactory.newClient(baseUrl, null);
+
+        // register schema first, if have registered before ignore
+        String topic = "TopicTest";
+        RegisterSchemaRequest request = RegisterSchemaRequest.builder()
+                
.schemaIdl("{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"age\":{\"type\":\"int\"}}")
+                .schemaType(SchemaType.JSON)
+                .compatibility(Compatibility.BACKWARD)
+                .owner("test").build();
+        try {
+            RegisterSchemaResponse response
+                    = schemaRegistryClient.registerSchema(topic, "Person", 
request);
+            System.out.println("register schema success, schemaId: " + 
response.getRecordId());
+
+            Thread.sleep(5000);
+            System.out.println("current schema: " + 
schemaRegistryClient.getSchemaBySubject(topic));
+        } catch (RestClientException | IOException | InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        Person person = new Person(1L, "name", 18);
+        System.out.printf("person before serialize is %s\n", person);
+
+        try(JsonSerde<Person> jsonSerde = new 
JsonSerde<>(schemaRegistryClient, Person.class)) {
+            byte[] bytes = jsonSerde.serializer().serialize("TopicTest", 
person);
+
+            Person person1 = jsonSerde.deserializer().deserialize("TopicTest", 
bytes);
+            System.out.printf("after deserialize new person is %s\n", person1);
+            System.out.printf("person == person1 : %b\n", 
person1.equals(person));
+        } catch (IOException e) {
+            System.out.println("serde shutdown failed");
+        }
+    }
+}

Reply via email to