This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
The following commit(s) were added to refs/heads/main by this push:
new 7043568 check avro schema compatibility by schema recordID during
ser/de (#63)
7043568 is described below
commit 70435683fac84fac0335efa646b3e94c623951fa
Author: Humkum <[email protected]>
AuthorDate: Thu Oct 6 09:08:51 2022 +0800
check avro schema compatibility by schema recordID during ser/de (#63)
---
.../client/NormalSchemaRegistryClient.java | 27 ++++
.../registry/client/SchemaRegistryClient.java | 13 ++
...oSerializerConfig.java => AvroSerdeConfig.java} | 9 +-
...nSerializerConfig.java => JsonSerdeConfig.java} | 4 +-
.../{SerializerConfig.java => SerdeConfig.java} | 24 ++-
.../schema/registry/client/rest/RestService.java | 32 ++++
.../client/serde/avro/AvroDeserializer.java | 90 +++++++----
.../registry/client/serde/avro/AvroSerializer.java | 23 ++-
.../client/serde/avro/GenericAvroDeserializer.java | 11 +-
.../client/serde/avro/GenericAvroSerde.java | 21 ++-
.../client/serde/avro/GenericAvroSerializer.java | 10 +-
.../serde/avro/ReflectionAvroDeserializer.java | 6 +-
.../serde/avro/SpecificAvroDeserializer.java | 6 +
.../client/serde/json/JsonDeserializer.java | 4 +-
.../registry/client/serde/json/JsonSerializer.java | 6 +-
.../client/serde/SkipSchemaRegistrySerdeTest.java | 6 +-
.../client/serde/avro/GenericAvroSerdeTest.java | 7 +-
.../client/serde/avro/ReflectionAvroSerdeTest.java | 4 +-
.../client/serde/avro/SpecificAvroSerdeTest.java | 6 +-
.../registry/client/serde/json/JsonSerdeTest.java | 6 +-
.../registry/common/constant/SchemaConstants.java | 1 +
...SchemaDefination.java => SchemaDefinition.java} | 2 +-
.../registry/common/storage/StorageService.java | 4 +
.../common/storage/StorageServiceProxy.java | 7 +
.../registry/core/api/v1/SchemaController.java | 176 +++++++++++++++++++++
.../registry/core/service/SchemaService.java | 4 +
.../registry/core/service/SchemaServiceImpl.java | 20 +++
.../schema/registry/example/GetSchemaDemo.java | 6 +
.../example/serde/avro/GenericAvroSerdeDemo.java | 12 +-
.../serde/avro/ReflectionAvroSerdeDemo.java | 4 +-
.../example/serde/avro/SpecificAvroSerdeDemo.java | 5 +-
.../registry/example/serde/json/JsonSerdeDemo.java | 4 +-
.../serde/json/JsonSerdeWithoutServerDemo.java | 6 +-
.../storage/rocketmq/RocketmqStorageClient.java | 4 +
.../rocketmq/RocketmqStorageClientImpl.java | 30 ++++
.../storage/rocketmq/RocketmqStorageService.java | 5 +
36 files changed, 503 insertions(+), 102 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
index 1256bb5..966441e 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -90,6 +90,23 @@ public class NormalSchemaRegistryClient implements
SchemaRegistryClient {
return restService.getSchemaBySubject(cluster, tenant, subject,
version);
}
+ public GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long
version)
+ throws IOException, RestClientException {
+ return restService.getSchemaBySubject(subject, version);
+ }
+
+ @Override
+ public GetSchemaResponse getTargetSchema(String cluster, String tenant,
String subject, String schema)
+ throws RestClientException, IOException {
+ return restService.getTargetSchema(cluster, tenant, subject, schema);
+ }
+
+ @Override
+ public GetSchemaResponse getTargetSchema(String subject, String schema)
+ throws RestClientException, IOException {
+ return restService.getTargetSchema(subject, schema);
+ }
+
@Override
public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String
tenant,
String subject) throws RestClientException, IOException {
@@ -107,4 +124,14 @@ public class NormalSchemaRegistryClient implements
SchemaRegistryClient {
return restService.getAllTenants(cluster);
}
+ public GetSchemaResponse getSchemaByRecordId(String cluster, String
tenant, String subject,
+ long recordId) throws RestClientException, IOException {
+ return restService.getSchemaByRecordId(cluster, tenant, subject,
recordId);
+ }
+
+ @Override
+ public GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+ throws RestClientException, IOException {
+ return restService.getSchemaByRecordId(subject, recordId);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
index 7e02738..32c4ff2 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -55,10 +55,23 @@ public interface SchemaRegistryClient {
GetSchemaResponse getSchemaBySubjectAndVersion(String cluster, String
tenant, String subject,
long version) throws IOException, RestClientException;
+ GetSchemaResponse getSchemaBySubjectAndVersion(String subject, long
version)
+ throws IOException, RestClientException;
+
+ GetSchemaResponse getTargetSchema(String cluster, String tenant, String
subject, String schema)
+ throws RestClientException, IOException;
+ GetSchemaResponse getTargetSchema(String subject, String schema) throws
RestClientException, IOException;
+
List<SchemaRecordDto> getSchemaListBySubject(String cluster, String tenant,
String subject) throws RestClientException, IOException;
List<String> getSubjectsByTenant(String cluster, String tenant) throws
RestClientException, IOException;
List<String> getAllTenants(String cluster) throws RestClientException,
IOException;
+
+ GetSchemaResponse getSchemaByRecordId(String cluster, String tenant,
String subject, long recordId)
+ throws RestClientException, IOException;
+
+ GetSchemaResponse getSchemaByRecordId(String subject, long recordId)
+ throws RestClientException, IOException;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
similarity index 84%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
index 15b32bf..78e2b9c 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/AvroSerdeConfig.java
@@ -18,12 +18,15 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class AvroSerializerConfig extends SerializerConfig {
+public class AvroSerdeConfig extends SerdeConfig {
+ /**
+ * use generic datum reader to deserialize genericRecord.
+ */
public final static String USE_GENERIC_DATUM_READER =
- "use.generic.datum.reader";
+ "use.generic.datum.reader";
public final static boolean USE_GENERIC_DATUM_READER_DEFAULT = false;
- public AvroSerializerConfig(Map<String, Object> configs) {
+ public AvroSerdeConfig(Map<String, Object> configs) {
this.configs = configs;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
similarity index 88%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
index 56af0c7..8545d9c 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/JsonSerdeConfig.java
@@ -18,8 +18,8 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class JsonSerializerConfig extends SerializerConfig {
- public JsonSerializerConfig(Map<String, Object> configs) {
+public class JsonSerdeConfig extends SerdeConfig {
+ public JsonSerdeConfig(Map<String, Object> configs) {
this.configs = configs;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
similarity index 63%
rename from
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
rename to
client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
index 87b582b..9fc3c50 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerializerConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/config/SerdeConfig.java
@@ -18,12 +18,26 @@ package org.apache.rocketmq.schema.registry.client.config;
import java.util.Map;
-public class SerializerConfig {
+public class SerdeConfig {
public final static String SKIP_SCHEMA_REGISTRY =
"skip.schema.registry";
public final static boolean SKIP_SCHEMA_REGISTRY_DEFAULT = false;
public final static String DESERIALIZE_TARGET_TYPE =
"deserialize.target.type";
+ /**
+ * use target version schema to serialize/deserialize without recordId.
+ * version default null, will use the latest version.
+ */
+ public final static String USE_TARGET_VERSION_SCHEMA =
+ "use.target.version.schema";
+
+ public final static boolean USE_TARGET_VERSION_SCHEMA_DEFAULT = false;
+
+ public final static String SCHEMA_TARGET_VERSION =
+ "schema.target.version";
+
+ public final static long SCHEMA_TARGET_VERSION_DEFAULT = -1;
+
protected Map<String, Object> configs;
public boolean skipSchemaRegistry() {
@@ -33,4 +47,12 @@ public class SerializerConfig {
public Class<?> deserializeTargetType() {
return (Class) configs.get(DESERIALIZE_TARGET_TYPE);
}
+
+ public boolean useTargetVersionSchema() {
+ return (boolean) configs.getOrDefault(USE_TARGET_VERSION_SCHEMA,
USE_TARGET_VERSION_SCHEMA_DEFAULT);
+ }
+
+ public long schemaTargetVersion() {
+ return (long) configs.getOrDefault(SCHEMA_TARGET_VERSION,
SCHEMA_TARGET_VERSION_DEFAULT);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
index 704ac77..a1207eb 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -140,6 +140,12 @@ public class RestService {
return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
}
+ public GetSchemaResponse getSchemaBySubject(String subject, long version)
+ throws IOException, RestClientException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/schema/versions/{version}");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject, version).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
+ }
public List<SchemaRecordDto> getSchemaListBySubject(String cluster, String
tenant,
String subject) throws RestClientException, IOException {
UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions");
@@ -158,4 +164,30 @@ public class RestService {
String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster).toString());
return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
LIST_STRING_REFERENCE);
}
+
+ public GetSchemaResponse getTargetSchema(String subject, String schema)
throws RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/schema/schema");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_POST, schema, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public GetSchemaResponse getTargetSchema(String cluster, String tenant,
String subject, String schema)
+ throws RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/schema");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_POST, schema, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public GetSchemaResponse getSchemaByRecordId(String cluster, String
tenant, String subject, long recordId)
+ throws RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/recordId/{record-id}/schema");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(cluster, tenant, subject, recordId).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public GetSchemaResponse getSchemaByRecordId(String subject, long
recordId) throws RestClientException, IOException {
+ UrlBuilder urlBuilder =
UrlBuilder.fromPath("/subject/{subject-name}/recordId/{record-id}/schema");
+ String path = HttpUtil.buildRequestUrl(baseUri,
urlBuilder.build(subject, recordId).toString());
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders,
GET_SCHEMA_DTO_TYPE_REFERENCE);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
index 87622c0..2e4391e 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java
@@ -22,9 +22,11 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -40,11 +42,10 @@ import java.util.Map;
public class AvroDeserializer<T> implements Deserializer<T> {
Logger log = LoggerFactory.getLogger(AvroDeserializer.class);
-
- private final DecoderFactory decoderFactory = DecoderFactory.get();
protected SchemaRegistryClient schemaRegistry;
-
private boolean useGenericReader;
+ private boolean useTargetVersionSchema;
+ private long schemaTargetVersion;
public AvroDeserializer(){}
@@ -54,12 +55,19 @@ public class AvroDeserializer<T> implements Deserializer<T>
{
@Override
public void configure(Map<String, Object> configs) {
- AvroSerializerConfig config = new AvroSerializerConfig(configs);
+ AvroSerdeConfig config = new AvroSerdeConfig(configs);
this.useGenericReader = config.useGenericReader();
+ this.useTargetVersionSchema = config.useTargetVersionSchema();
+ //get schema by version, if didn't configure this, would use the
latest version
+ this.schemaTargetVersion = config.schemaTargetVersion();
}
@Override
- public T deserialize(String subject, byte[] payload)
+ public T deserialize(String subject, byte[] payload) {
+ return this.deserialize(subject, payload, null);
+ }
+
+ public T deserialize(String subject, byte[] payload, Schema readerSchema)
throws SerializationException {
if (schemaRegistry == null) {
throw new SerializationException("please initialize the schema
registry client first");
@@ -69,38 +77,60 @@ public class AvroDeserializer<T> implements Deserializer<T>
{
}
try {
- GetSchemaResponse response =
schemaRegistry.getSchemaBySubject(subject);
- Schema schema = new Schema.Parser().parse(response.getIdl());
- return avroDecode(payload, schema);
- } catch (RestClientException | IOException e) {
- throw new RuntimeException(e);
+ ByteArrayInputStream bais = new ByteArrayInputStream(payload);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais,
null);
+
+ GetSchemaResponse response;
+ if (useTargetVersionSchema) {
+ response = AvroSerdeConfig.SCHEMA_TARGET_VERSION_DEFAULT ==
schemaTargetVersion
+ ? schemaRegistry.getSchemaBySubject(subject)
+ : schemaRegistry.getSchemaBySubjectAndVersion(subject,
schemaTargetVersion);
+ } else {
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ decoder.readBytes(buffer);
+ long schemaRecordId = buffer.getLong();
+ response = schemaRegistry.getSchemaByRecordId(subject,
schemaRecordId);
+ }
+ Schema writerSchema = new Schema.Parser().parse(response.getIdl());
+ if (readerSchema == null) {
+ readerSchema = getReaderSchema(writerSchema);
+ }
+
+ DatumReader<T> datumReader = getDatumReader(writerSchema,
readerSchema);
+ return datumReader.read(null, decoder);
+ } catch (RestClientException e) {
+ log.warn("get schema by record id failed, maybe the schema storage
service not available now", e);
+ throw new SerializationException("get schema by record id failed,
maybe the schema storage service not available now", e);
+ } catch (IOException e) {
+ log.warn("deserialize failed", e);
+ throw new SerializationException("deserialize error", e);
}
-
}
- public T avroDecode(byte[] input, Schema schema) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(input);
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais, null);
-
- ByteBuffer buffer = ByteBuffer.allocate(16);
- try {
- decoder.readBytes(buffer);
- } catch (Exception e) {
- log.error("read bytes error: ", e);
+ @SuppressWarnings("unchecked")
+ private Schema getReaderSchema(Schema writerSchema) {
+ if (useGenericReader) {
+ return writerSchema;
+ } else {
+ Class<SpecificRecord> readerClass =
SpecificData.get().getClass(writerSchema);
+ if (readerClass == null) {
+ throw new SerializationException("cannot get a schema for a
SpecificRecord");
+ }
+ try {
+ return readerClass.newInstance().getSchema();
+ } catch (InstantiationException e) {
+ throw new SerializationException("cannot initialize reader
schema by writerSchema class", e);
+ } catch (IllegalAccessException e) {
+ throw new SerializationException("not allowed initialize
reader schema by writerSchema class", e);
+ }
}
-
- long schemaRecordId = buffer.getLong();
-
- DatumReader<T> datumReader = getDatumReader(schema);
- T record = datumReader.read(null, decoder);
- return record;
}
- private DatumReader<T> getDatumReader(Schema schema) {
+ private DatumReader<T> getDatumReader(Schema schema, Schema readerSchema) {
if (useGenericReader) {
- return new GenericDatumReader<>(schema);
+ return new GenericDatumReader<>(schema, readerSchema);
} else {
- return new SpecificDatumReader<>(schema);
+ return new SpecificDatumReader<>(schema, readerSchema);
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
index e240683..b6a608f 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java
@@ -18,10 +18,13 @@
package org.apache.rocketmq.schema.registry.client.serde.avro;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
@@ -62,10 +65,19 @@ public class AvroSerializer<T> implements Serializer<T> {
if (record == null) {
return null;
}
+ String purposeSchema;
+ if (record instanceof GenericRecord) {
+ purposeSchema = ((GenericContainer) record).getSchema().toString();
+ } else {
+ purposeSchema =
SpecificData.get().getSchema(record.getClass()).toString();
+ }
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out,
null);
- GetSchemaResponse response = getSchemaBySubject(subject);
+ GetSchemaResponse response =
schemaRegistry.getTargetSchema(subject, purposeSchema);
+ if (response == null) {
+ throw new SerializationException("there's no corresponding
schema version equals to given schema : " + purposeSchema);
+ }
long schemaRecordId = response.getRecordId();
String schemaIdl = response.getIdl();
Schema schema = new Schema.Parser().parse(schemaIdl);
@@ -80,19 +92,14 @@ public class AvroSerializer<T> implements Serializer<T> {
}
datumWriter.write(record, encoder);
encoder.flush();
- byte[] bytes = out.toByteArray();
- return bytes;
+ return out.toByteArray();
} catch (IOException | RuntimeException e) {
throw new SerializationException("serialize Avro message failed",
e);
} catch (RestClientException e) {
- throw new SerializationException("get schema by subject failed",
e);
+ throw new SerializationException("get target schema failed", e);
}
}
- private GetSchemaResponse getSchemaBySubject(String subject) throws
RestClientException, IOException {
- return schemaRegistry.getSchemaBySubject(subject);
- }
-
@Override
public void close() {
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
index 0028ace..1a03abb 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroDeserializer.java
@@ -16,14 +16,15 @@
*/
package org.apache.rocketmq.schema.registry.client.serde.avro;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
import java.util.Map;
-public class GenericAvroDeserializer<T extends GenericRecord> implements
Deserializer<T> {
- private final AvroDeserializer<T> inner;
+public class GenericAvroDeserializer implements Deserializer<GenericRecord> {
+ private final AvroDeserializer<GenericRecord> inner;
public GenericAvroDeserializer() {
this.inner = new AvroDeserializer<>();
@@ -39,9 +40,13 @@ public class GenericAvroDeserializer<T extends
GenericRecord> implements Deseria
}
@Override
- public T deserialize(String subject, byte[] bytes) {
+ public GenericRecord deserialize(String subject, byte[] bytes) {
return this.inner.deserialize(subject, bytes);
}
+
+ public GenericRecord deserialize(String subject, byte[] bytes, Schema
readerSchema) {
+ return this.inner.deserialize(subject, bytes, readerSchema);
+ }
@Override
public void close() {
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
index 240c10d..29c60c1 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerde.java
@@ -16,30 +16,27 @@
*/
package org.apache.rocketmq.schema.registry.client.serde.avro;
-import org.apache.avro.generic.GenericRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
-import org.apache.rocketmq.schema.registry.client.serde.Serializer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-public class GenericAvroSerde<T extends GenericRecord> implements Closeable {
- private final Serializer<T> serializer;
- private final Deserializer<T> deserializer;
+public class GenericAvroSerde implements Closeable {
+ private final GenericAvroSerializer serializer;
+ private final GenericAvroDeserializer deserializer;
public GenericAvroSerde() {
- this.serializer = new GenericAvroSerializer<T>();
- this.deserializer = new GenericAvroDeserializer<T>();
+ this.serializer = new GenericAvroSerializer();
+ this.deserializer = new GenericAvroDeserializer();
}
public GenericAvroSerde(final SchemaRegistryClient client) {
if (null == client) {
throw new IllegalArgumentException("please initialize the schema
registry client first");
}
- this.serializer = new AvroSerializer<>(client);
- this.deserializer = new AvroDeserializer<>(client);
+ this.serializer = new GenericAvroSerializer(client);
+ this.deserializer = new GenericAvroDeserializer(client);
}
public void configure(final Map<String, Object> configs) {
@@ -47,11 +44,11 @@ public class GenericAvroSerde<T extends GenericRecord>
implements Closeable {
this.deserializer.configure(configs);
}
- public Serializer<T> serializer() {
+ public GenericAvroSerializer serializer() {
return this.serializer;
}
- public Deserializer<T> deserializer() {
+ public GenericAvroDeserializer deserializer() {
return this.deserializer;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
index d178502..6d7da33 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerializer.java
@@ -22,16 +22,16 @@ import
org.apache.rocketmq.schema.registry.client.serde.Serializer;
import java.util.Map;
-public class GenericAvroSerializer<T extends GenericRecord> implements
Serializer<T> {
+public class GenericAvroSerializer implements Serializer<GenericRecord> {
- private final AvroSerializer<T> inner;
+ private final AvroSerializer<GenericRecord> inner;
public GenericAvroSerializer() {
- this.inner = new AvroSerializer<T>();
+ this.inner = new AvroSerializer<>();
}
public GenericAvroSerializer(final SchemaRegistryClient client) {
- this.inner = new AvroSerializer<T>(client);
+ this.inner = new AvroSerializer<>(client);
}
@Override
public void configure(final Map<String, Object> configs) {
@@ -39,7 +39,7 @@ public class GenericAvroSerializer<T extends GenericRecord>
implements Serialize
}
@Override
- public byte[] serialize(final String subject, final T record) {
+ public byte[] serialize(final String subject, final GenericRecord record) {
return this.inner.serialize(subject, record);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
index 768306e..236ed32 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -39,8 +39,8 @@ public class ReflectionAvroDeserializer<T> implements
Deserializer<T> {
@Override
public void configure(Map<String, Object> configs) {
- AvroSerializerConfig avroSerializerConfig = new
AvroSerializerConfig(configs);
- this.type = avroSerializerConfig.deserializeTargetType();
+ AvroSerdeConfig avroSerdeConfig = new AvroSerdeConfig(configs);
+ this.type = avroSerdeConfig.deserializeTargetType();
}
@Override
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
index 192a626..c9ad493 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroDeserializer.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.schema.registry.client.serde.avro;
+import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.serde.Deserializer;
@@ -45,6 +46,11 @@ public class SpecificAvroDeserializer<T extends
SpecificRecord> implements Deser
return (T) this.inner.deserialize(subject, bytes);
}
+ @SuppressWarnings("unchecked")
+ public T deserialize(String subject, byte[] bytes, Schema readerSchema) {
+ return (T) this.inner.deserialize(subject, bytes, readerSchema);
+ }
+
@Override
public void close() {
this.inner.close();
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
index 2f300c4..797ddd9 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonDeserializer.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -50,7 +50,7 @@ public class JsonDeserializer<T> implements Deserializer<T> {
@Override
public void configure(Map<String, Object> configs) {
- JsonSerializerConfig serializerConfig = new
JsonSerializerConfig(configs);
+ JsonSerdeConfig serializerConfig = new JsonSerdeConfig(configs);
this.skipSchemaRegistry = serializerConfig.skipSchemaRegistry();
this.type = (Class<T>) serializerConfig.deserializeTargetType();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
index 85e0b4e..930538b 100644
---
a/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
+++
b/client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerializer.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.schema.registry.client.serde.json;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import
org.apache.rocketmq.schema.registry.client.exceptions.SerializationException;
import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
@@ -47,8 +47,8 @@ public class JsonSerializer<T> implements Serializer<T> {
@Override
public void configure(Map<String, Object> configs) {
- JsonSerializerConfig jsonSerializerConfig = new
JsonSerializerConfig(configs);
- this.skipSchemaRegistry = jsonSerializerConfig.skipSchemaRegistry();
+ JsonSerdeConfig jsonSerdeConfig = new JsonSerdeConfig(configs);
+ this.skipSchemaRegistry = jsonSerdeConfig.skipSchemaRegistry();
}
@Override
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
index 4a7f22b..bc1cd96 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/SkipSchemaRegistrySerdeTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.schema.registry.client.serde;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
import org.junit.jupiter.api.Test;
@@ -33,8 +33,8 @@ public class SkipSchemaRegistrySerdeTest {
try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
Map<String, Object> configs = new HashMap<>();
- configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
- configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
+ configs.put(JsonSerdeConfig.SKIP_SCHEMA_REGISTRY, true);
+ configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
jsonSerde.configure(configs);
byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
index b25485f..41bca4e 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/GenericAvroSerdeTest.java
@@ -20,7 +20,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.junit.jupiter.api.Test;
@@ -52,7 +52,8 @@ public class GenericAvroSerdeTest {
when(getSchemaResponse.getIdl()).thenReturn(idl);
registryClient = mock(SchemaRegistryClient.class);
-
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+ when(registryClient.getTargetSchema("TopicTest",
schema.toString())).thenReturn(getSchemaResponse);
+ when(registryClient.getSchemaByRecordId("TopicTest",
1111L)).thenReturn(getSchemaResponse);
GenericRecord record = new GenericRecordBuilder(schema)
.set("item", "generic")
@@ -62,7 +63,7 @@ public class GenericAvroSerdeTest {
try (GenericAvroSerde serde = new GenericAvroSerde(registryClient)) {
//configure
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
+ configs.put(AvroSerdeConfig.USE_GENERIC_DATUM_READER, true);
serde.configure(configs);
//serialize
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
index 4864cf5..2716ef5 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/ReflectionAvroSerdeTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.schema.registry.client.serde.avro;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import org.apache.rocketmq.schema.registry.client.serde.Charge;
import org.junit.jupiter.api.Test;
@@ -31,7 +31,7 @@ public class ReflectionAvroSerdeTest {
public void testReflectionAvroSerde() {
Charge charge = new Charge("specific", 100.0);
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ configs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
//serialize
serde.configure(configs);
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
index 3424d80..c58ee51 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/avro/SpecificAvroSerdeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.schema.registry.client.serde.avro;
+import org.apache.avro.Schema;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.serde.Charge;
@@ -26,6 +27,7 @@ import org.mockito.Mock;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,9 +48,11 @@ public class SpecificAvroSerdeTest {
when(getSchemaResponse.getRecordId()).thenReturn(11111L);
when(getSchemaResponse.getSchemaFullName()).thenReturn("org.apache.rocketmq.schema.registry.example.serde.Charge");
when(getSchemaResponse.getIdl()).thenReturn(idl);
+ Schema schema = new Schema.Parser().parse(idl);
registryClient = mock(SchemaRegistryClient.class);
-
when(registryClient.getSchemaBySubject("TopicTest")).thenReturn(getSchemaResponse);
+ when(registryClient.getTargetSchema("TopicTest",
schema.toString())).thenReturn(getSchemaResponse);
+ when(registryClient.getSchemaByRecordId("TopicTest",
11111L)).thenReturn(getSchemaResponse);
try (SpecificAvroSerde serde = new SpecificAvroSerde(registryClient)) {
//serialize
diff --git
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
index 78d4a0d..12c50c6 100644
---
a/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
+++
b/client/src/test/java/org/apache/rocketmq/schema/registry/client/serde/json/JsonSerdeTest.java
@@ -17,11 +17,9 @@
package org.apache.rocketmq.schema.registry.client.serde.json;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
-import org.apache.rocketmq.schema.registry.client.serde.Charge;
import org.apache.rocketmq.schema.registry.client.serde.Person;
-import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
@@ -57,7 +55,7 @@ public class JsonSerdeTest {
//serialize
Person person = new Person(1L, "Tom", 18);
Map<String, Object> configs = new HashMap<>();
- configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
person.getClass());
+ configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE,
person.getClass());
serde.configure(configs);
byte[] bytes = serde.serializer().serialize("TopicTest", person);
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
index f3af9be..c2525e0 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/constant/SchemaConstants.java
@@ -20,4 +20,5 @@ package org.apache.rocketmq.schema.registry.common.constant;
public class SchemaConstants {
public static final int SCHEMA_RECORD_ID_LENGTH = 8;
public static final char SUBJECT_SEPARATOR = '/';
+ public static final int SCHEMA_VERSION_BITS = 14;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
similarity index 96%
rename from
common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
rename to
common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
index 2c3e873..88967cc 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefination.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/model/SchemaDefinition.java
@@ -17,5 +17,5 @@
package org.apache.rocketmq.schema.registry.common.model;
-public class SchemaDefination {
+public class SchemaDefinition {
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
index 8dc85a5..45fbcea 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageService.java
@@ -80,6 +80,10 @@ public interface StorageService<T extends BaseInfo> {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+ default SchemaRecordInfo getTargetSchema(StorageServiceContext context,
QualifiedName name) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
default List<SchemaRecordInfo> listBySubject(final StorageServiceContext
context, final QualifiedName name) {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
index 8f0457f..8305085 100644
---
a/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
+++
b/common/src/main/java/org/apache/rocketmq/schema/registry/common/storage/StorageServiceProxy.java
@@ -106,6 +106,13 @@ public class StorageServiceProxy {
return storageService.getBySubject(storageServiceContext, name);
}
+ public SchemaRecordInfo getTargetSchema(final QualifiedName name) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ final StorageServiceContext storageServiceContext =
storageUtil.convertToStorageServiceContext(requestContext);
+ final StorageService<SchemaInfo> storageService =
storageManager.getStorageService();
+ return storageService.getTargetSchema(storageServiceContext, name);
+ }
+
public List<SchemaRecordInfo> listBySubject(final QualifiedName name) {
final RequestContext requestContext =
RequestContextManager.getContext();
final StorageServiceContext storageServiceContext =
storageUtil.convertToStorageServiceContext(requestContext);
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
index d69686d..4466aed 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/api/v1/SchemaController.java
@@ -26,6 +26,7 @@ import java.net.HttpURLConnection;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -411,6 +412,181 @@ public class SchemaController {
);
}
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/subject/{subject-name}/schema/versions/{version}"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with the given version under the subject")
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public GetSchemaResponse getSchemaBySubject(
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable(value = "subject-name") String subject,
+ @ApiParam(value = "The version of the schema", required = true)
+ @PathVariable(value = "version") String version
+ ) {
+ return getSchemaBySubject(DEFAULT_CLUSTER, DEFAULT_TENANT, subject,
version);
+ }
+
+ @RequestMapping(
+ method = RequestMethod.POST,
+ path =
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/schema"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information from target schema of subject"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public GetSchemaResponse getTargetSchema(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable(value = "subject-name") String subject,
+ @ApiParam(value = "The schema idl", required = true)
+ @RequestBody final String schema
+ ) {
+ QualifiedName name = new QualifiedName(cluster, tenant, subject,
schema);
+
+ return this.requestProcessor.processRequest(
+ "getTargetSchema",
+ () -> schemaService.getTargetSchema(name)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.POST,
+ path = "/subject/{subject-name}/schema/schema"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information from target schema of subject"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public GetSchemaResponse getTargetSchema(
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable(value = "subject-name") String subject,
+ @ApiParam(value = "The schema idl", required = true)
+ @RequestBody final String schema
+ ) {
+ QualifiedName name = new QualifiedName("default", "default", subject,
schema);
+
+ return this.requestProcessor.processRequest(
+ "getTargetSchema",
+ () -> schemaService.getTargetSchema(name)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path =
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/recordId/{record-id}/schema"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with target recordId"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public GetSchemaResponse getSchemaByRecordId(
+ @ApiParam(value = "The cluster of the subject", required = true)
+ @PathVariable(value = "cluster-name") final String cluster,
+ @ApiParam(value = "The tenant of the schema", required = true)
+ @PathVariable(value = "tenant-name") final String tenant,
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable(value = "subject-name") String subject,
+ @ApiParam(value = "The recordId of the schema", required = true)
+ @PathVariable(value = "record-id") final String recordId
+ ) {
+ long versionMask = ~(-1L << SchemaConstants.SCHEMA_VERSION_BITS);
+ long recordIdResource = Long.parseLong(recordId);
+ Long version = recordIdResource & versionMask;
+ QualifiedName qualifiedName = new QualifiedName(cluster, tenant,
subject, null, version);
+
+ return this.requestProcessor.processRequest(
+ "getSchemaByRecordId",
+ () -> schemaService.getBySubject(qualifiedName)
+ );
+ }
+
+ @RequestMapping(
+ method = RequestMethod.GET,
+ path = "/subject/{subject-name}/recordId/{record-id}/schema"
+ )
+ @ApiOperation(
+ value = "Schema information",
+ notes = "Schema information with target recordId"
+ )
+ @ApiResponses(
+ {
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_OK,
+ message = "The schema is returned"
+ ),
+ @ApiResponse(
+ code = HttpURLConnection.HTTP_NOT_FOUND,
+ message = "The requested tenant or schema cannot be found"
+ )
+ }
+ )
+ public GetSchemaResponse getSchemaByRecordId(
+ @ApiParam(value = "The name of the subject", required = true)
+ @PathVariable(value = "subject-name") String subject,
+ @ApiParam(value = "The recordId of the schema", required = true)
+ @PathVariable(value = "record-id") final String recordId
+ ) {
+ QualifiedName qualifiedName = new QualifiedName("default", "default",
subject, null, null);
+
+ return this.requestProcessor.processRequest(
+ "getSchemaByRecordId",
+ () -> schemaService.getByRecordId(qualifiedName,
Long.parseLong(recordId))
+ );
+ }
+
@RequestMapping(
method = RequestMethod.GET,
path =
"/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/versions"
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
index d7af10b..2557ae6 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaService.java
@@ -83,4 +83,8 @@ public interface SchemaService<T extends BaseDto> {
List<String> listSubjectsByTenant(QualifiedName qualifiedName);
List<String> listTenants(QualifiedName name);
+
+ GetSchemaResponse getTargetSchema(QualifiedName qualifiedName);
+
+ GetSchemaResponse getByRecordId(QualifiedName qualifiedName, long
recordId);
}
diff --git
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
index 63f74a3..89b7e65 100644
---
a/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
+++
b/core/src/main/java/org/apache/rocketmq/schema/registry/core/service/SchemaServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.auth.AccessControlService;
+import org.apache.rocketmq.schema.registry.common.constant.SchemaConstants;
import org.apache.rocketmq.schema.registry.common.context.RequestContext;
import
org.apache.rocketmq.schema.registry.common.context.RequestContextManager;
import org.apache.rocketmq.schema.registry.common.dto.DeleteSchemeResponse;
@@ -299,6 +300,25 @@ public class SchemaServiceImpl implements
SchemaService<SchemaDto> {
return tenants;
}
+ public GetSchemaResponse getTargetSchema(QualifiedName qualifiedName) {
+ final RequestContext requestContext =
RequestContextManager.getContext();
+ log.info("get request context: " + requestContext);
+ this.accessController.checkPermission("", qualifiedName.getTenant(),
SchemaOperation.GET);
+ SchemaRecordInfo schemaRecordInfo =
storageServiceProxy.getTargetSchema(qualifiedName);
+ if (schemaRecordInfo == null) {
+ throw new SchemaException("Schema: " + qualifiedName + " not
exist");
+ }
+ return new GetSchemaResponse(qualifiedName, schemaRecordInfo);
+ }
+
+ @Override
+ public GetSchemaResponse getByRecordId(QualifiedName qualifiedName, long
recordId) {
+ long versionMask = ~(-1L << SchemaConstants.SCHEMA_VERSION_BITS);
+ Long version = recordId & versionMask;
+ qualifiedName.setVersion(version);
+ return getBySubject(qualifiedName);
+ }
+
private void checkSchemaExist(final QualifiedName qualifiedName) {
if (storageServiceProxy.get(qualifiedName) != null) {
throw new SchemaExistException(qualifiedName);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
index 0348885..0eb06e0 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/GetSchemaDemo.java
@@ -47,5 +47,11 @@ public class GetSchemaDemo {
} catch (RestClientException | IOException e) {
e.printStackTrace();
}
+ try {
+ List<SchemaRecordDto> schemas =
schemaRegistryClient.getSchemaListBySubject("default", "default", topic);
+ System.out.println(schemas);
+ } catch (RestClientException | IOException e) {
+ e.printStackTrace();
+ }
}
}
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
index 69dd52c..a2c9d2e 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/GenericAvroSerdeDemo.java
@@ -21,7 +21,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import org.apache.rocketmq.schema.registry.client.serde.avro.GenericAvroSerde;
import java.io.IOException;
@@ -37,14 +37,14 @@ public class GenericAvroSerdeDemo {
Schema schema = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Charge\",\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
GenericRecord record = new GenericRecordBuilder(schema)
- .set("item", "generic")
- .set("amount", 100.0)
- .build();
+ .set("item", "generic")
+ .set("amount", 100.0)
+ .build();
- try (GenericAvroSerde<GenericRecord> serde = new
GenericAvroSerde<>(schemaRegistryClient)) {
+ try (GenericAvroSerde serde = new
GenericAvroSerde(schemaRegistryClient)) {
//configure
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroSerializerConfig.USE_GENERIC_DATUM_READER, true);
+ configs.put(AvroSerdeConfig.USE_GENERIC_DATUM_READER, true);
serde.configure(configs);
//serialize
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
index 62b5bd0..816f32f 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/ReflectionAvroSerdeDemo.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.schema.registry.example.serde.avro;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.serde.avro.ReflectionAvroSerde;
import org.apache.rocketmq.schema.registry.example.serde.Charge;
@@ -28,7 +28,7 @@ public class ReflectionAvroSerdeDemo {
public static void main(String[] args) {
Charge charge = new Charge("specific", 100.0);
Map<String, Object> configs = new HashMap<>();
- configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ configs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
try (ReflectionAvroSerde serde = new ReflectionAvroSerde()) {
//serialize
serde.configure(configs);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
index aaec0ec..c5a29aa 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/avro/SpecificAvroSerdeDemo.java
@@ -19,7 +19,7 @@ package
org.apache.rocketmq.schema.registry.example.serde.avro;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.AvroSerdeConfig;
import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;
import org.apache.rocketmq.schema.registry.example.serde.Charge;
@@ -40,8 +40,7 @@ public class SpecificAvroSerdeDemo {
//serialize
Charge charge = new Charge("specific", 100.0);
- serializeConfigs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY,
true);
- serializeConfigs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
+ serializeConfigs.put(AvroSerdeConfig.DESERIALIZE_TARGET_TYPE,
charge.getClass());
serde.configure(serializeConfigs);
byte[] bytes = serde.serializer().serialize("TopicTest", charge);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
index bcb4e7a..50364a3 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeDemo.java
@@ -19,7 +19,7 @@ package
org.apache.rocketmq.schema.registry.example.serde.json;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import
org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
import org.apache.rocketmq.schema.registry.common.dto.RegisterSchemaRequest;
@@ -61,7 +61,7 @@ public class JsonSerdeDemo {
try(JsonSerde<Person> jsonSerde = new
JsonSerde<>(schemaRegistryClient)) {
Map<String, Object> configs = new HashMap<>();
- configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
+ configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
jsonSerde.configure(configs);
byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
diff --git
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
index c7e3500..3bb400b 100644
---
a/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
+++
b/example/src/main/java/org/apache/rocketmq/schema/registry/example/serde/json/JsonSerdeWithoutServerDemo.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.schema.registry.example.serde.json;
-import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
+import org.apache.rocketmq.schema.registry.client.config.JsonSerdeConfig;
import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;
import org.apache.rocketmq.schema.registry.example.serde.Person;
@@ -33,8 +33,8 @@ public class JsonSerdeWithoutServerDemo {
try(JsonSerde<Person> jsonSerde = new JsonSerde<>()) {
Map<String, Object> configs = new HashMap<>();
- configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE,
Person.class);
- configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
+ configs.put(JsonSerdeConfig.DESERIALIZE_TARGET_TYPE, Person.class);
+ configs.put(JsonSerdeConfig.SKIP_SCHEMA_REGISTRY, true);
jsonSerde.configure(configs);
byte[] bytes = jsonSerde.serializer().serialize("TopicTest",
person);
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
index 0099025..b71f476 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClient.java
@@ -75,6 +75,10 @@ public interface RocketmqStorageClient {
throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
}
+ default SchemaRecordInfo getTargetSchema(QualifiedName qualifiedName) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE_DEFAULT);
+ }
+
/**
* list all versions of rocketmq schema entity by subject.
*
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
index 0ffee01..e728445 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageClientImpl.java
@@ -20,17 +20,21 @@ package
org.apache.rocketmq.schema.registry.storage.rocketmq;
import java.io.File;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.context.StoragePluginContext;
import
org.apache.rocketmq.schema.registry.common.context.StorageServiceContext;
import org.apache.rocketmq.schema.registry.common.json.JsonConverter;
import org.apache.rocketmq.schema.registry.common.json.JsonConverterImpl;
import org.apache.rocketmq.schema.registry.common.model.SchemaInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaMetaInfo;
import org.apache.rocketmq.schema.registry.common.model.SchemaRecordInfo;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
import org.apache.rocketmq.schema.registry.common.utils.CommonUtil;
@Slf4j
@@ -113,6 +117,32 @@ public class RocketmqStorageClientImpl implements
RocketmqStorageClient {
return versionSchemaMap.get(qualifiedName.getVersion());
}
+ @Override
+ public SchemaRecordInfo getTargetSchema(QualifiedName qualifiedName) {
+ // schema version is given
+ SchemaInfo schemaInfo =
rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
+ if (schemaInfo == null || schemaInfo.getDetails() == null ||
schemaInfo.getDetails().getSchemaRecords() == null) {
+ return null;
+ }
+ SchemaMetaInfo schemaMetaInfo = schemaInfo.getMeta();
+ if (schemaMetaInfo == null) {
+ return null;
+ }
+ if (schemaMetaInfo.getType() == SchemaType.AVRO) {
+ for (SchemaRecordInfo schemaRecordInfo :
schemaInfo.getDetails().getSchemaRecords()) {
+ Schema store = new
Schema.Parser().parse(schemaRecordInfo.getIdl());
+ Schema target = new
Schema.Parser().parse(qualifiedName.getSchema());
+ if (Objects.equals(store, target)) {
+ return schemaRecordInfo;
+ }
+ }
+ } else {
+ //todo support other type
+ return null;
+ }
+ return null;
+ }
+
@Override
public List<SchemaRecordInfo> listBySubject(QualifiedName qualifiedName) {
SchemaInfo schemaInfo =
rocketmqClient.getSchemaInfoBySubject(qualifiedName.subjectFullName());
diff --git
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
index 1c91b89..27ebd06 100644
---
a/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
+++
b/storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqStorageService.java
@@ -79,6 +79,11 @@ public class RocketmqStorageService implements
StorageService<SchemaInfo> {
return storageClient.getBySubject(name);
}
+ @Override
+ public SchemaRecordInfo getTargetSchema(StorageServiceContext context,
QualifiedName name) {
+ return storageClient.getTargetSchema(name);
+ }
+
@Override
public List<SchemaRecordInfo> listBySubject(StorageServiceContext context,
QualifiedName name) {
return storageClient.listBySubject(name);