This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new b9a79f7 refine avro serde type to SpecificAvroSerde and
GernericAvroSerde
new 77cb8c6 Merge pull request #25 from humkum/main
b9a79f7 is described below
commit b9a79f78d40f2b4c042187302d0833caa437a1db
Author: Hankunming <[email protected]>
AuthorDate: Tue Aug 2 21:00:39 2022 +0800
refine avro serde type to SpecificAvroSerde and GernericAvroSerde
---
.../AvroDeserializerConfig.java} | 28 +--
.../client/{serializer => serde}/Deserializer.java | 4 +-
.../client/{serializer => serde}/Serializer.java | 4 +-
.../avro/AvroDeserializer.java} | 42 +++-
.../avro/AvroSerializer.java} | 38 ++-
.../avro/GenericAvroDeserializer.java} | 25 +-
.../client/serde/avro/GenericAvroSerde.java | 63 +++++
.../avro/GenericAvroSerializer.java} | 25 +-
.../avro/SpecificAvroDeserializer.java} | 26 +-
.../client/serde/avro/SpecificAvroSerde.java | 64 +++++
.../avro/SpecificAvroSerializer.java} | 29 ++-
.../schema/registry/client/serde/Charge.java | 269 +++++++++++++++++++++
.../client/serde/GenericAvroSerdeTest.java | 82 +++++++
.../client/serde/SpecificAvroSerdeTest.java | 68 ++++++
.../example/serde/GenericAvroSerdeDemo.java | 60 +++++
...roSerdeDemo.java => SpecificAvroSerdeDemo.java} | 28 ++-
.../storage/rocketmq/configs/ClientConfig.java | 6 +-
.../storage/rocketmq/configs/ServiceConfig.java | 2 +-
18 files changed, 771 insertions(+), 92 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
similarity index 57%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
index 406eec7..af897cc 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroDeserializerConfig.java
@@ -14,30 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.rocketmq.schema.registry.client.serializer;
-
-import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements
Deserializer<T> {
- public AvroDeserializer(){}
+public class AvroDeserializerConfig {
+ public final static String USE_GENERIC_DATUM_READER =
+ "use.generic.datum.reader";
+ public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
- public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
- schemaRegistry = schemaRegistryClient;
- }
-
- @Override
- public void configure(Map<String, ?> configs) {
- }
+ private final Map<String, Object> configs;
- @Override
- public T deserialize(String subject, byte[] bytes) {
- return (T) deserializeImpl(subject, bytes);
+ public AvroDeserializerConfig(Map<String, Object> configs) {
+ this.configs = configs;
}
- @Override
- public void close() {
+ public boolean useGenericReader() {
+ return (boolean) configs.getOrDefault(USE_GENERIC_DATUM_READER,
USE_GENERIC_DATUM_READER_DEFAULT);
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
similarity index 89%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
index 81ee7a5..254a299 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Deserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Deserializer.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde;
import java.io.Closeable;
import java.util.Map;
public interface Deserializer<T> extends Closeable {
- default void configure(Map<String, ?> configs) {}
+ default void configure(Map<String, Object> configs) {}
T deserialize(String subject, byte[] bytes);
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
similarity index 89%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
index 0b01c75..b02ca2d 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/Serializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/Serializer.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde;
import java.io.Closeable;
import java.util.Map;
public interface Serializer<T> extends Closeable {
- default void configure(Map<String, ?> configs) {}
+ default void configure(Map<String, Object> configs) {}
byte[] serialize(String subject, T originMessage);
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
similarity index 69%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index cf24dcb..33287b0 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,15 +35,31 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
-public class AbstractAvroDeserializer<T> {
+public class AvroDeserializer<T> implements Deserializer<T> {
Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
private final DecoderFactory decoderFactory = DecoderFactory.get();
protected SchemaRegistryClient schemaRegistry;
- protected T deserializeImpl(String subject, byte[] payload)
+ private boolean useGenericReader;
+
+ public AvroDeserializer(){}
+
+ public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
+ schemaRegistry = schemaRegistryClient;
+ }
+
+ @Override
+ public void configure(Map<String, Object> configs) {
+ AvroDeserializerConfig config = new AvroDeserializerConfig(configs);
+ this.useGenericReader = config.useGenericReader();
+ }
+
+ @Override
+ public T deserialize(String subject, byte[] payload)
throws SerializationException {
if (schemaRegistry == null) {
throw new SerializationException("please initialize the schema
registry client first");
@@ -73,9 +92,20 @@ public class AbstractAvroDeserializer<T> {
long schemaId = buffer.getLong();
long version = buffer.getLong();
- DatumReader<T> datumReader = new SpecificDatumReader<T>(schema);
- T originMessage = datumReader.read(null, decoder);
- return originMessage;
+ DatumReader<T> datumReader = getDatumReader(schema);
+ T record = datumReader.read(null, decoder);
+ return record;
}
+ private DatumReader<T> getDatumReader(Schema schema) {
+ if (useGenericReader) {
+ return new GenericDatumReader<>(schema);
+ } else {
+ return new SpecificDatumReader<>(schema);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
similarity index 74%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index 953a76b..3dffe1e 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AbstractAvroSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -15,37 +15,53 @@
* limitations under the License.
*/
-package org.apache.rocketmq.schema.registry.client.serializer;
+package org.apache.rocketmq.schema.registry.client.serde.avro;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Map;
-public class AbstractAvroSerializer<T> {
+public class AvroSerializer<T> implements Serializer<T> {
private static final int SCHEMA_ID_LENGTH = 8;
private static final int SCHEMA_VERSION_LENGTH = 8;
protected SchemaRegistryClient schemaRegistry;
private final EncoderFactory encoderFactory = EncoderFactory.get();
- protected byte[] serializeImpl(
- String subject, T originMessage)
+ public AvroSerializer() {}
+
+ public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
+ schemaRegistry = schemaRegistryClient;
+ }
+
+ @Override
+ public void configure(Map<String, Object> configs) {
+ Serializer.super.configure(configs);
+ }
+
+ @Override
+ public byte[] serialize(
+ String subject, T record)
throws SerializationException {
if (schemaRegistry == null) {
throw new SerializationException("please initialize the schema
registry client first");
}
- if (originMessage == null) {
+ if (record == null) {
return null;
}
@@ -60,8 +76,13 @@ public class AbstractAvroSerializer<T> {
ByteBuffer buffer = ByteBuffer.allocate(SCHEMA_ID_LENGTH +
SCHEMA_VERSION_LENGTH);
encoder.writeBytes(buffer.putLong(schemaId).putLong(schemaVersion).array());
- DatumWriter<T> datumWriter = new SpecificDatumWriter<>(schema);
- datumWriter.write(originMessage, encoder);
+ DatumWriter<T> datumWriter;
+ if (record instanceof SpecificRecord) {
+ datumWriter = new SpecificDatumWriter<>(schema);
+ } else {
+ datumWriter = new GenericDatumWriter<>(schema);
+ }
+ datumWriter.write(record, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
@@ -77,4 +98,7 @@ public class AbstractAvroSerializer<T> {
return schemaRegistry.getSchemaBySubject(subject);
}
+ @Override
+ public void close() {
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
similarity index 55%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
index 406eec7..0c58699 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
@@ -14,30 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.generic.GenericRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
import java.util.Map;
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements
Deserializer<T> {
- public AvroDeserializer(){}
+public class GenericAvroDeserializer implements Deserializer<GenericRecord> {
+ private final AvroDeserializer<GenericRecord> inner;
+
+ public GenericAvroDeserializer() {
+ this.inner = new AvroDeserializer<>();
+ }
- public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
- schemaRegistry = schemaRegistryClient;
+ public GenericAvroDeserializer(final SchemaRegistryClient client) {
+ this.inner = new AvroDeserializer<>(client);
}
@Override
- public void configure(Map<String, ?> configs) {
+ public void configure(final Map<String, Object> configs) {
+ this.inner.configure(configs);
}
@Override
- public T deserialize(String subject, byte[] bytes) {
- return (T) deserializeImpl(subject, bytes);
+ public GenericRecord deserialize(String subject, byte[] bytes) {
+ return this.inner.deserialize(subject, bytes);
}
@Override
public void close() {
+ this.inner.close();
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
new file mode 100644
index 0000000..3abe215
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public class GenericAvroSerde implements Closeable {
+ private final Serializer<GenericRecord> serializer;
+ private final Deserializer<GenericRecord> deserializer;
+
+ public GenericAvroSerde() {
+ this.serializer = new GenericAvroSerializer();
+ this.deserializer = new GenericAvroDeserializer();
+ }
+
+ public GenericAvroSerde(final SchemaRegistryClient client) {
+ if (null == client) {
+ throw new IllegalArgumentException("please initialize the schema
registry client first");
+ }
+ this.serializer = new AvroSerializer<>(client);
+ this.deserializer = new AvroDeserializer<>(client);
+ }
+
+ public void configure(final Map<String, Object> configs) {
+ this.serializer.configure(configs);
+ this.deserializer.configure(configs);
+ }
+
+ public Serializer<GenericRecord> serializer() {
+ return this.serializer;
+ }
+
+ public Deserializer<GenericRecord> deserializer() {
+ return this.deserializer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.serializer.close();
+ this.deserializer.close();
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
similarity index 55%
copy from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
copy to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
index 406eec7..8eb1dbf 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
@@ -14,30 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.generic.GenericRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
import java.util.Map;
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements
Deserializer<T> {
- public AvroDeserializer(){}
+public class GenericAvroSerializer implements Serializer<GenericRecord> {
+
+ private final AvroSerializer<GenericRecord> inner;
- public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
- schemaRegistry = schemaRegistryClient;
+ public GenericAvroSerializer() {
+ this.inner = new AvroSerializer<GenericRecord>();
}
+ public GenericAvroSerializer(final SchemaRegistryClient client) {
+ this.inner = new AvroSerializer<GenericRecord>(client);
+ }
@Override
- public void configure(Map<String, ?> configs) {
+ public void configure(final Map<String, Object> configs) {
+ this.inner.configure(configs);
}
@Override
- public T deserialize(String subject, byte[] bytes) {
- return (T) deserializeImpl(subject, bytes);
+ public byte[] serialize(final String subject, final GenericRecord record) {
+ return this.inner.serialize(subject, record);
}
@Override
public void close() {
+ this.inner.close();
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
similarity index 56%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
index 406eec7..f6f90e5 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
@@ -14,30 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
import java.util.Map;
-public class AvroDeserializer<T> extends AbstractAvroDeserializer implements
Deserializer<T> {
- public AvroDeserializer(){}
+public class SpecificAvroDeserializer implements Deserializer<SpecificRecord> {
+
+ private final AvroDeserializer<SpecificRecord> inner;
+
+ public SpecificAvroDeserializer() {
+ this.inner = new AvroDeserializer<>();
+ }
- public AvroDeserializer(SchemaRegistryClient schemaRegistryClient) {
- schemaRegistry = schemaRegistryClient;
+ public SpecificAvroDeserializer(final SchemaRegistryClient client) {
+ this.inner = new AvroDeserializer<>(client);
}
@Override
- public void configure(Map<String, ?> configs) {
+ public void configure(final Map<String, Object> configs) {
+ this.inner.configure(configs);
}
@Override
- public T deserialize(String subject, byte[] bytes) {
- return (T) deserializeImpl(subject, bytes);
+ public SpecificRecord deserialize(String subject, byte[] bytes) {
+ return this.inner.deserialize(subject, bytes);
}
@Override
public void close() {
+ this.inner.close();
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
new file mode 100644
index 0000000..afaf508
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerde.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde.avro;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+public class SpecificAvroSerde implements Closeable {
+ private final AvroSerializer<SpecificRecord> serializer;
+ private final AvroDeserializer<SpecificRecord> deserializer;
+
+ public SpecificAvroSerde() {
+ this.serializer = new AvroSerializer<>();
+ this.deserializer = new AvroDeserializer<>();
+ }
+
+ public SpecificAvroSerde(final SchemaRegistryClient client) {
+ if (null == client) {
+ throw new IllegalArgumentException("please initialize schema
registry client first");
+ }
+
+ this.serializer = new AvroSerializer<>(client);
+ this.deserializer = new AvroDeserializer<>(client);
+ }
+
+ public Serializer<SpecificRecord> serializer() {
+ return this.serializer;
+ }
+
+ public Deserializer<SpecificRecord> deserializer() {
+ return this.deserializer;
+ }
+
+ public void configure(final Map<String, Object> configs) {
+ this.serializer.configure(configs);
+ this.deserializer.configure(configs);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.serializer.close();
+ this.deserializer.close();
+ }
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
similarity index 56%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
index 9276e8b..12ed260 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serializer/AvroSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerializer.java
@@ -14,35 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.schema.registry.client.serde.avro;
-package org.apache.rocketmq.schema.registry.client.serializer;
-
+import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.serde.Serializer;
import java.util.Map;
-public class AvroSerializer<T> extends AbstractAvroSerializer implements
Serializer<T> {
+public class SpecificAvroSerializer implements Serializer<SpecificRecord> {
+
+ private final AvroSerializer<SpecificRecord> inner;
- public AvroSerializer() {}
+ public SpecificAvroSerializer() {
+ this.inner = new AvroSerializer<>();
+ }
- public AvroSerializer(SchemaRegistryClient schemaRegistryClient) {
- schemaRegistry = schemaRegistryClient;
+ public SpecificAvroSerializer(final SchemaRegistryClient client) {
+ this.inner = new AvroSerializer<>(client);
}
@Override
- public void configure(Map<String, ?> configs) {
- Serializer.super.configure(configs);
+ public void configure(final Map<String, Object> configs) {
+ this.inner.configure(configs);
}
@Override
- public byte[] serialize(String subject, T originMessage) {
- if (originMessage == null) {
- return null;
- }
- return serializeImpl(subject, originMessage);
+ public byte[] serialize(String subject, SpecificRecord record) {
+ return this.inner.serialize(subject, record);
}
@Override
public void close() {
+ this.inner.close();
}
}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
new file mode 100644
index 0000000..aa48960
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/Charge.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+@SuppressWarnings("all")
[email protected]
+public class Charge extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
+ private static final long serialVersionUID = -3449629867777645843L;
+ public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public CharSequence item;
+ @Deprecated public double amount;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use <code>newBuilder()</code>.
+ */
+ public Charge() {}
+
+ /**
+ * All-args constructor.
+ */
+ public Charge(CharSequence item, Double amount) {
+ this.item = item;
+ this.amount = amount;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public Object get(int field$) {
+ switch (field$) {
+ case 0: return item;
+ case 1: return amount;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, Object value$) {
+ switch (field$) {
+ case 0: item = (CharSequence)value$; break;
+ case 1: amount = (Double)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'item' field.
+ */
+ public CharSequence getItem() {
+ return item;
+ }
+
+ /**
+ * Sets the value of the 'item' field.
+ * @param value the value to set.
+ */
+ public void setItem(CharSequence value) {
+ this.item = value;
+ }
+
+ /**
+ * Gets the value of the 'amount' field.
+ */
+ public Double getAmount() {
+ return amount;
+ }
+
+ /**
+ * Sets the value of the 'amount' field.
+ * @param value the value to set.
+ */
+ public void setAmount(Double value) {
+ this.amount = value;
+ }
+
+ /**
+ * Creates a new Charge RecordBuilder.
+ * @return A new Charge RecordBuilder
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Creates a new Charge RecordBuilder by copying an existing Builder.
+ * @param other The existing builder to copy.
+ * @return A new Charge RecordBuilder
+ */
+ public static Builder newBuilder(Builder other) {
+ return new Builder(other);
+ }
+
+ /**
+ * Creates a new Charge RecordBuilder by copying an existing Charge instance.
+ * @param other The existing instance to copy.
+ * @return A new Charge RecordBuilder
+ */
+ public static Builder newBuilder(Charge other) {
+ return new Builder(other);
+ }
+
+ /**
+ * RecordBuilder for Charge instances.
+ */
+ public static class Builder extends
org.apache.avro.specific.SpecificRecordBuilderBase<Charge>
+ implements org.apache.avro.data.RecordBuilder<Charge> {
+
+ private CharSequence item;
+ private double amount;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(Charge.SCHEMA$);
+ }
+
+ /**
+ * Creates a Builder by copying an existing Builder.
+ * @param other The existing Builder to copy.
+ */
+ private Builder(Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.item)) {
+ this.item = data().deepCopy(fields()[0].schema(), other.item);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.amount)) {
+ this.amount = data().deepCopy(fields()[1].schema(), other.amount);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /**
+ * Creates a Builder by copying an existing Charge instance
+ * @param other The existing instance to copy.
+ */
+ private Builder(Charge other) {
+ super(Charge.SCHEMA$);
+ if (isValidValue(fields()[0], other.item)) {
+ this.item = data().deepCopy(fields()[0].schema(), other.item);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.amount)) {
+ this.amount = data().deepCopy(fields()[1].schema(), other.amount);
+ fieldSetFlags()[1] = true;
+ }
+ }
+
+ /**
+ * Gets the value of the 'item' field.
+ * @return The value.
+ */
+ public CharSequence getItem() {
+ return item;
+ }
+
+ /**
+ * Sets the value of the 'item' field.
+ * @param value The value of 'item'.
+ * @return This builder.
+ */
+ public Builder setItem(CharSequence value) {
+ validate(fields()[0], value);
+ this.item = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'item' field has been set.
+ * @return True if the 'item' field has been set, false otherwise.
+ */
+ public boolean hasItem() {
+ return fieldSetFlags()[0];
+ }
+
+
+ /**
+ * Clears the value of the 'item' field.
+ * @return This builder.
+ */
+ public Builder clearItem() {
+ item = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /**
+ * Gets the value of the 'amount' field.
+ * @return The value.
+ */
+ public Double getAmount() {
+ return amount;
+ }
+
+ /**
+ * Sets the value of the 'amount' field.
+ * @param value The value of 'amount'.
+ * @return This builder.
+ */
+ public Builder setAmount(double value) {
+ validate(fields()[1], value);
+ this.amount = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /**
+ * Checks whether the 'amount' field has been set.
+ * @return True if the 'amount' field has been set, false otherwise.
+ */
+ public boolean hasAmount() {
+ return fieldSetFlags()[1];
+ }
+
+
+ /**
+ * Clears the value of the 'amount' field.
+ * @return This builder.
+ */
+ public Builder clearAmount() {
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ @Override
+ public Charge build() {
+ try {
+ Charge record = new Charge();
+ record.item = fieldSetFlags()[0] ? this.item : (CharSequence)
defaultValue(fields()[0]);
+ record.amount = fieldSetFlags()[1] ? this.amount : (Double)
defaultValue(fields()[1]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+
+ private static final org.apache.avro.io.DatumWriter
+ WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
+
+ @Override public void writeExternal(java.io.ObjectOutput out)
+ throws java.io.IOException {
+ WRITER$.write(this, org.apache.avro.specific.SpecificData.getEncoder(out));
+ }
+
+ private static final org.apache.avro.io.DatumReader
+ READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
+
+ @Override public void readExternal(java.io.ObjectInput in)
+ throws java.io.IOException {
+ READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in));
+ }
+
+}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
new file mode 100644
index 0000000..c841283
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/GenericAvroSerdeTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GenericAvroSerdeTest {
+ @Mock
+ private SchemaRegistryClient registryClient;
+
+ @Mock
+ private GetSchemaResponse getSchemaResponse;
+
+ @Test
+ public void testGenericSerde() throws RestClientException, IOException {
+ String idl =
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\","
+ +
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+ Schema schema = new Schema.Parser().parse(idl);
+
+ getSchemaResponse = mock(GetSchemaResponse.class);
+ when(getSchemaResponse.getSchemaId()).thenReturn(1111L);
+ when(getSchemaResponse.getVersion()).thenReturn(11111L);
+ when(getSchemaResponse.getIdl()).thenReturn(idl);
+
+ registryClient = mock(SchemaRegistryClient.class);
+
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+
+ GenericRecord record = new GenericRecordBuilder(schema)
+ .set("item", "generic")
+ .set("amount", 100.0)
+ .build();
+
+ try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
+ //configure
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+ serde.configure(configs);
+
+ //serialize
+ byte[] bytes = serde.serializer().serialize("TopicTest", record);
+
+ //deserialize
+ GenericRecord record1 =
serde.deserializer().deserialize("TopicTest", bytes);
+ assertThat(record1).isEqualTo(record);
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
+
+
+ }
+}
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
new file mode 100644
index 0000000..41c1536
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SpecificAvroSerdeTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.client.serde;
+
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SpecificAvroSerdeTest {
+
+ @Mock
+ private SchemaRegistryClient registryClient;
+
+ @Mock
+ private GetSchemaResponse getSchemaResponse;
+
+ @Test
+ public void testSpecificSerde() throws RestClientException, IOException {
+ String idl =
"{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.client.serde\","
+ +
"\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}";
+
+ getSchemaResponse = mock(GetSchemaResponse.class);
+ when(getSchemaResponse.getSchemaId()).thenReturn(11111L);
+ when(getSchemaResponse.getVersion()).thenReturn(11111L);
+
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
+ when(getSchemaResponse.getIdl()).thenReturn(idl);
+
+ registryClient = mock(SchemaRegistryClient.class);
+
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+
+ try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
+ //serialize
+ Charge charge = new Charge("specific", 100.0);
+ byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+
+ //deserialize
+ Charge charge1 = (Charge)
serde.deserializer().deserialize("TopicTest", bytes);
+ assertThat(charge1).isEqualTo(charge);
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
+
+
+ }
+}
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
new file mode 100644
index 0000000..b8359a7
--- /dev/null
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/GenericAvroSerdeDemo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.schema.registry.example.serde;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import
org.apache.rocketmq.schema.registry.client.config.AvroDeserializerConfig;
+import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GenericAvroSerdeDemo {
+
+ public static void main(String[] args) {
+
+ String baseUrl = "http://localhost:8080/schema-registry/v1";
+ SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
+
+ Schema schema = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+ GenericRecord record = new GenericRecordBuilder(schema)
+ .set("item", "generic")
+ .set("amount", 100.0)
+ .build();
+
+ try (GenericAvroSerde serde = new
GenericAvroSerde(schemaRegistryClient)) {
+ //configure
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(AvroDeserializerConfig.USE_GENERIC_DATUM_READER, true);
+ serde.configure(configs);
+
+ //serialize
+ byte[] bytes = serde.serializer().serialize("TopicTest", record);
+
+ //deserialize
+ GenericRecord record1 =
serde.deserializer().deserialize("TopicTest", bytes);
+ System.out.println("the origin object after ser/de is " + record1);
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
similarity index 60%
rename from
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
rename to
example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
index d765a93..281d383 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/AvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/SpecificAvroSerdeDemo.java
@@ -19,26 +19,28 @@ package org.apache.rocketmq.schema.registry.example.serde;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.serializer.AvroDeserializer;
-import org.apache.rocketmq.schema.registry.client.serializer.AvroSerializer;
-import org.apache.rocketmq.schema.registry.client.serializer.Deserializer;
-import org.apache.rocketmq.schema.registry.client.serializer.Serializer;
+import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
-public class AvroSerdeDemo {
+import java.io.IOException;
+
+public class SpecificAvroSerdeDemo {
public static void main(String[] args) {
String baseUrl = "http://localhost:8080/schema-registry/v1";
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(baseUrl, null);
- //serialize
- Charge charge = new Charge("fee1", 100.0);
- Serializer<Charge> serializer = new
AvroSerializer<>(schemaRegistryClient);
- byte[] bytes = serializer.serialize("TopicTest", charge);
+ try (SpecificAvroSerde serde = new
SpecificAvroSerde(schemaRegistryClient)) {
+ //serialize
+ Charge charge = new Charge("specific", 100.0);
+ byte[] bytes = serde.serializer().serialize("TopicTest", charge);
+
+ //deserialize
+ Charge charge1 = (Charge)
serde.deserializer().deserialize("TopicTest", bytes);
+ System.out.println("the origin object after ser/de is " + charge1);
+ } catch (IOException e) {
+ System.out.println("serde shutdown failed");
+ }
- //deserialize
- Deserializer<Charge> deserializer = new
AvroDeserializer<>(schemaRegistryClient);
- Charge charge1 = deserializer.deserialize("TopicTest", bytes);
- System.out.println("the origin object after ser/de is " + charge1);
}
}
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
index 9ca2489..57e3ca0 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ClientConfig.java
@@ -27,10 +27,10 @@ import org.springframework.context.annotation.Configuration;
public class ClientConfig {
/**
- * Talos client instance.
+ * RocketMQ client instance.
*
- * @param context connector context
- * @return MetacatTalosClient
+ * @param context storage plugin context
+ * @return RocketmqStorageClient
*/
@Bean
public RocketmqStorageClient rocketmqStorageClient(StoragePluginContext
context) {
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
index de0aa90..3aaea49 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/configs/ServiceConfig.java
@@ -29,7 +29,7 @@ public class ServiceConfig {
* create rocketmq storage service.
*
* @param storageClient rocketmq storage Client
- * @return talos connector table Service
+ * @return rocketmq storage Service
*/
@Bean
public RocketmqStorageService rocketmqStorageService(RocketmqStorageClient
storageClient) {