gaoran10 commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r633484599



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
 
     private String topicName;
 
     private String componentName;
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    public AutoConsumeSchema() {
+        schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+    }
+
+    public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+        schemaMap.put(schemaVersion, schema);
+    }
+
     public void setSchema(Schema<?> schema) {
-        this.schema = schema;
+        schemaMap.put(SchemaVersion.Latest, schema);
     }
 
-    private void ensureSchemaInitialized() {
-        checkState(null != schema, "Schema is not initialized before used");
+    private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+        checkState(schemaMap.containsKey(schemaVersion),
+                "Schema version " + schemaVersion + " is not initialized 
before used");
     }
 
     @Override
     public void validate(byte[] message) {
-        ensureSchemaInitialized();
+        ensureSchemaInitialized(SchemaVersion.Latest);
 
-        schema.validate(message);
+        schemaMap.get(SchemaVersion.Latest).validate(message);
     }
 
     @Override
     public byte[] encode(GenericRecord message) {
-        ensureSchemaInitialized();
-
         throw new UnsupportedOperationException("AutoConsumeSchema is not 
intended to be used for encoding");
     }
 
     @Override
     public boolean supportSchemaVersioning() {
-        return schema == null || schema.supportSchemaVersioning();
+        return true;

Review comment:
       Yes, I also think this is a confusing point. I make some checks, the 
check `supportSchemaVersioning` is used to get the last version schema of the 
topic, and determine to decode message data in versioned mode or not, these all 
could work well.
   
   If using the `SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE` strategy, the 
topic may have multiple version schemas, the primitive schema may be a part of 
the multiple version schemas. If using other strategies, I think only the 
struct schema could have multiple version schemas and the schema type couldn't 
be changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to