congbobo184 commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r637485530



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +45,97 @@
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = 
initSchemaMap();
 
     private String topicName;
 
     private String componentName;
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
+        ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = 
Maps.newConcurrentMap();
+        // The Schema.BYTES will not be uploaded to the broker and store in 
the schema storage,
+        // if the schema version in the message metadata is empty byte[], it 
means its schema is Schema.BYTES.
+        schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+        return schemaMap;
+    }
+
+    public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+        schemaMap.put(schemaVersion, schema);
+    }
+
     public void setSchema(Schema<?> schema) {
-        this.schema = schema;
+        schemaMap.put(SchemaVersion.Latest, schema);
     }
 
-    private void ensureSchemaInitialized() {
-        checkState(null != schema, "Schema is not initialized before used");
+    private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+        checkState(schemaMap.containsKey(schemaVersion),
+                "Schema version " + schemaVersion + " is not initialized 
before used");
     }
 
     @Override
     public void validate(byte[] message) {
-        ensureSchemaInitialized();
+        ensureSchemaInitialized(SchemaVersion.Latest);
 
-        schema.validate(message);
+        schemaMap.get(SchemaVersion.Latest).validate(message);
     }
 
     @Override
     public byte[] encode(GenericRecord message) {
-        ensureSchemaInitialized();
-
         throw new UnsupportedOperationException("AutoConsumeSchema is not 
intended to be used for encoding");
     }
 
     @Override
     public boolean supportSchemaVersioning() {
-        return schema == null || schema.supportSchemaVersioning();
+        return true;
     }
 
     public Schema<?> atSchemaVersion(byte[] schemaVersion) {
-        fetchSchemaIfNeeded();
-        ensureSchemaInitialized();
-        if (schema.supportSchemaVersioning() && schema instanceof 
AbstractSchema) {
-            return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+        SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        fetchSchemaIfNeeded(sv);
+        ensureSchemaInitialized(sv);
+        Schema<?> topicVersionedSchema = schemaMap.get(sv);
+        if (topicVersionedSchema.supportSchemaVersioning() && 
topicVersionedSchema instanceof AbstractSchema) {
+            return ((AbstractSchema<?>) 
topicVersionedSchema).atSchemaVersion(schemaVersion);
         } else {
-            return schema;
+            return topicVersionedSchema;
         }
     }
 
     @Override
     public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        fetchSchemaIfNeeded();
-        ensureSchemaInitialized();
-        return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
+        SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        fetchSchemaIfNeeded(sv);
+        ensureSchemaInitialized(sv);
+        return adapt(schemaMap.get(sv).decode(bytes, schemaVersion), 
schemaVersion);

Review comment:
       may don't need decode(bytes, schemaVersion), only use decode(bytes) best?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to