gaoran10 commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r637499517
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +45,97 @@
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
initSchemaMap();
private String topicName;
private String componentName;
private SchemaInfoProvider schemaInfoProvider;
+ private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
+ ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
Maps.newConcurrentMap();
+ // The Schema.BYTES will not be uploaded to the broker and store in
the schema storage,
+ // if the schema version in the message metadata is empty byte[], it
means its schema is Schema.BYTES.
+ schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+ return schemaMap;
+ }
+
+ public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+ schemaMap.put(schemaVersion, schema);
+ }
+
public void setSchema(Schema<?> schema) {
- this.schema = schema;
+ schemaMap.put(SchemaVersion.Latest, schema);
}
- private void ensureSchemaInitialized() {
- checkState(null != schema, "Schema is not initialized before used");
+ private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+ checkState(schemaMap.containsKey(schemaVersion),
+ "Schema version " + schemaVersion + " is not initialized
before used");
}
@Override
public void validate(byte[] message) {
- ensureSchemaInitialized();
+ ensureSchemaInitialized(SchemaVersion.Latest);
- schema.validate(message);
+ schemaMap.get(SchemaVersion.Latest).validate(message);
}
@Override
public byte[] encode(GenericRecord message) {
- ensureSchemaInitialized();
-
throw new UnsupportedOperationException("AutoConsumeSchema is not
intended to be used for encoding");
}
@Override
public boolean supportSchemaVersioning() {
- return schema == null || schema.supportSchemaVersioning();
+ return true;
}
public Schema<?> atSchemaVersion(byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- if (schema.supportSchemaVersioning() && schema instanceof
AbstractSchema) {
- return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ Schema<?> topicVersionedSchema = schemaMap.get(sv);
+ if (topicVersionedSchema.supportSchemaVersioning() &&
topicVersionedSchema instanceof AbstractSchema) {
+ return ((AbstractSchema<?>)
topicVersionedSchema).atSchemaVersion(schemaVersion);
} else {
- return schema;
+ return topicVersionedSchema;
}
}
@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ return adapt(schemaMap.get(sv).decode(bytes, schemaVersion),
schemaVersion);
Review comment:
Yes, good idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]