eolivelli commented on a change in pull request #10604:
URL: https://github.com/apache/pulsar/pull/10604#discussion_r633238917
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
private String topicName;
private String componentName;
private SchemaInfoProvider schemaInfoProvider;
+ public AutoConsumeSchema() {
+ schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+ }
+
+ public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+ schemaMap.put(schemaVersion, schema);
+ }
+
public void setSchema(Schema<?> schema) {
- this.schema = schema;
+ schemaMap.put(SchemaVersion.Latest, schema);
}
- private void ensureSchemaInitialized() {
- checkState(null != schema, "Schema is not initialized before used");
+ private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+ checkState(schemaMap.containsKey(schemaVersion),
+ "Schema version " + schemaVersion + " is not initialized
before used");
}
@Override
public void validate(byte[] message) {
- ensureSchemaInitialized();
+ ensureSchemaInitialized(SchemaVersion.Latest);
- schema.validate(message);
+ schemaMap.get(SchemaVersion.Latest).validate(message);
}
@Override
public byte[] encode(GenericRecord message) {
- ensureSchemaInitialized();
-
throw new UnsupportedOperationException("AutoConsumeSchema is not
intended to be used for encoding");
}
@Override
public boolean supportSchemaVersioning() {
- return schema == null || schema.supportSchemaVersioning();
+ return true;
Review comment:
I am not sure this is correct.
because for primitive types it is better to return "false"
I see that if we want to support ALWAYS schema compatibility we can have
both primitive types and version aware type.
have you thought to the impact of always returning `true` with a topic that
has a fixed schema of primitive types ? I would like not to add a significant
overhead
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
Review comment:
this should be a ConcurrentHashMap because the Schema can be accessed by
multiple threads concurrently
for instance a Consumer that consumes more messages and process them in
parallel, calling `Message.getReaderSchema() `or `Message.getValue()`
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
##########
@@ -43,73 +44,82 @@
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final Map<SchemaVersion, Schema<?>> schemaMap = new HashMap<>();
private String topicName;
private String componentName;
private SchemaInfoProvider schemaInfoProvider;
+ public AutoConsumeSchema() {
+ schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
Review comment:
why do we need to assume that an empty schema version is "BYTES" ? is
this some kind of placeholder ? in this case we should use a constant and also
explain in a comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]