eolivelli commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r633238917



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
 
     private String topicName;
 
     private String componentName;
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    public AutoConsumeSchema() {
+        schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+    }
+
+    public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+        schemaMap.put(schemaVersion, schema);
+    }
+
     public void setSchema(Schema<?> schema) {
-        this.schema = schema;
+        schemaMap.put(SchemaVersion.Latest, schema);
     }
 
-    private void ensureSchemaInitialized() {
-        checkState(null != schema, "Schema is not initialized before used");
+    private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+        checkState(schemaMap.containsKey(schemaVersion),
+                "Schema version " + schemaVersion + " is not initialized 
before used");
     }
 
     @Override
     public void validate(byte[] message) {
-        ensureSchemaInitialized();
+        ensureSchemaInitialized(SchemaVersion.Latest);
 
-        schema.validate(message);
+        schemaMap.get(SchemaVersion.Latest).validate(message);
     }
 
     @Override
     public byte[] encode(GenericRecord message) {
-        ensureSchemaInitialized();
-
         throw new UnsupportedOperationException("AutoConsumeSchema is not 
intended to be used for encoding");
     }
 
     @Override
     public boolean supportSchemaVersioning() {
-        return schema == null || schema.supportSchemaVersioning();
+        return true;

Review comment:
       I am not sure this is correct.
   because for primitive types it is better to return "false"
   I see that if we want to support ALWAYS schema compatibility we can have 
both primitive types and version aware type.
   
   have you thought to the impact of always returning `true` with a topic that 
has a fixed schema of primitive types ? I would like not to add a significant 
overhead

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();

Review comment:
       this should be a ConcurrentHashMap because the Schema can be accessed by 
multiple threads concurrently
   for instance a Consumer that consumes more messages and process them in 
parallel, calling `Message.getReaderSchema() `or `Message.getValue()`

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
 
     private String topicName;
 
     private String componentName;
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    public AutoConsumeSchema() {
+        schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);

Review comment:
       why do we need to assume that an empty schema version is "BYTES"  ? is 
this some kind of placeholder ? in this case we should use a constant and also 
explain in a comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to