This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e10a9  [Schema] Support consume multiple schema types messages by 
AutoConsumeSchema (#10604)
f5e10a9 is described below

commit f5e10a99c3f667f05c3303098ee22e9cd9332bf4
Author: ran <[email protected]>
AuthorDate: Sun May 23 21:47:55 2021 +0800

    [Schema] Support consume multiple schema types messages by 
AutoConsumeSchema (#10604)
    
    Based on the PR https://github.com/apache/pulsar/pull/10573
    
    ### Motivation
    
    Support consuming multiple schema types messages by AutoConsumeSchema.
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 182 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  16 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   2 +-
 .../client/impl/TypedMessageBuilderImpl.java       |   5 +-
 .../client/impl/schema/AutoConsumeSchema.java      | 130 +++++++++------
 .../client/impl/schema/AutoProduceBytesSchema.java |  15 +-
 .../generic/MultiVersionSchemaInfoProvider.java    |   2 +-
 .../schema/generic/AbstractGenericSchemaTest.java  |   3 +-
 .../impl/schema/generic/GenericSchemaImplTest.java |  26 +--
 .../impl/schema/generic/GenericSchemaTest.java     |  26 +--
 .../pulsar/proxy/server/LookupProxyHandler.java    |  14 +-
 11 files changed, 331 insertions(+), 90 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a2584a4..a50ab61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Sets;
 
 import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import lombok.Cleanup;
@@ -53,9 +56,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -781,4 +786,181 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         assertEquals("foo", message.getValue());
     }
 
+    public void testConsumeMultipleSchemaMessages() throws Exception {
+        final String namespace = "test-namespace-" + randomName(16);
+        String ns = PUBLIC_TENANT + "/" + namespace;
+        admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(ns, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String autoProducerTopic = getTopicName(ns, 
"auto_produce_topic");
+        Producer<byte[]> autoProducer = 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(autoProducerTopic)
+                .create();
+
+        AtomicInteger totalMsgCnt = new AtomicInteger(0);
+        generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes 
value".getBytes(),
+                autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "string_schema", Schema.STRING, 
"string value",
+                autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
+                autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "json_one_schema", 
Schema.JSON(Schemas.PersonOne.class),
+                new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "json_three_schema", 
Schema.JSON(Schemas.PersonThree.class),
+                new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "json_four_schema", 
Schema.JSON(Schemas.PersonFour.class),
+                new Schemas.PersonFour(4, "tang", 18), autoProducer, 
totalMsgCnt);
+        generateDataByDifferentSchema(ns, "avro_one_schema", 
Schema.AVRO(Schemas.PersonOne.class),
+                new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
+                Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+                        Schema.JSON(Schemas.PersonThree.class), 
KeyValueEncodingType.SEPARATED),
+                new KeyValue<>(new Schemas.PersonOne(1), new 
Schemas.PersonThree(3, "kv-separate")),
+                autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
+                Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+                        Schema.JSON(Schemas.PersonFour.class), 
KeyValueEncodingType.INLINE),
+                new KeyValue<>(new Schemas.PersonOne(10), new 
Schemas.PersonFour(30, "kv-inline", 20)),
+                autoProducer, totalMsgCnt);
+        generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
+                Schema.KeyValue(Schema.INT32, 
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
+                new KeyValue<>(100, new Schemas.PersonThree(40, 
"kv-separate")),
+                autoProducer, totalMsgCnt);
+
+        Consumer<GenericRecord> autoConsumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(autoProducerTopic)
+                .subscriptionName("test")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<GenericRecord> message;
+        for (int i = 0; i < totalMsgCnt.get(); i++) {
+            message = autoConsumer.receive(5, TimeUnit.SECONDS);
+            if (message == null) {
+                Assert.fail("Failed to receive multiple schema message.");
+            }
+            log.info("auto consumer get native object class: {}, value: {}",
+                    message.getValue().getNativeObject().getClass(), 
message.getValue().getNativeObject());
+            checkSchemaForAutoSchema(message);
+        }
+    }
+
+    private String getTopicName(String ns, String baseTopic) {
+        return ns + "/" + baseTopic;
+    }
+
+    private void generateDataByDifferentSchema(String ns,
+                                               String baseTopic,
+                                               Schema schema,
+                                               Object data,
+                                               Producer<?> autoProducer,
+                                               AtomicInteger totalMsgCnt) 
throws PulsarClientException {
+        String topic = getTopicName(ns, baseTopic);
+        Producer<Object> producer = pulsarClient.newProducer(schema)
+                .topic(topic)
+                .create();
+        producer.newMessage().value(data).property("baseTopic", 
baseTopic).send();
+        totalMsgCnt.incrementAndGet();
+
+        Consumer<GenericRecord> consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionName("test")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
+        if (message == null) {
+            Assert.fail("Failed to receive message for topic " + topic);
+        }
+        if (!message.getReaderSchema().isPresent()) {
+            Assert.fail("Failed to get reader schema for topic " + topic);
+        }
+        message.getValue();
+
+        Schema<?> readerSchema = message.getReaderSchema().get();
+        if (readerSchema instanceof KeyValueSchema
+                && ((KeyValueSchema<?, ?>) readerSchema)
+                
.getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+            autoProducer.newMessage(
+                    
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
+                    .value(message.getData())
+                    .properties(message.getProperties())
+                    .send();
+        } else {
+            
autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                    .properties(message.getProperties())
+                    .value(message.getData())
+                    .send();
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
+        if (!message.getReaderSchema().isPresent()) {
+            Assert.fail("Failed to get reader schema for auto consume multiple 
schema topic.");
+        }
+        Object nativeObject = message.getValue().getNativeObject();
+        String baseTopic = message.getProperty("baseTopic");
+        JsonNode jsonNode;
+        KeyValue<?, ?> kv;
+        switch (baseTopic) {
+            case "bytes_schema":
+                Assert.assertEquals(new String((byte[]) nativeObject), "bytes 
value");
+                break;
+            case "string_schema":
+                Assert.assertEquals((String) nativeObject, "string value");
+                break;
+            case "bool_schema":
+                Assert.assertEquals(nativeObject, Boolean.TRUE);
+                break;
+            case "json_one_schema":
+                jsonNode = (JsonNode) nativeObject;
+                Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+                break;
+            case "json_three_schema":
+                jsonNode = (JsonNode) nativeObject;
+                Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+                Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
+                break;
+            case "json_four_schema":
+                jsonNode = (JsonNode) nativeObject;
+                Assert.assertEquals(jsonNode.get("id").intValue(), 4);
+                Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
+                Assert.assertEquals(jsonNode.get("age").intValue(), 18);
+                break;
+            case "avro_one_schema":
+                org.apache.avro.generic.GenericRecord genericRecord =
+                        (org.apache.avro.generic.GenericRecord) nativeObject;
+                Assert.assertEquals(genericRecord.get("id"), 10);
+                break;
+            case "k_one_v_three_schema_separate":
+                kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+                jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+                Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+                jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+                Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+                Assert.assertEquals(jsonNode.get("name").textValue(), 
"kv-separate");
+                break;
+            case "k_one_v_four_schema_inline":
+                kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+                jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+                Assert.assertEquals(jsonNode.get("id").intValue(), 10);
+                jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+                Assert.assertEquals(jsonNode.get("id").intValue(), 30);
+                Assert.assertEquals(jsonNode.get("name").textValue(), 
"kv-inline");
+                Assert.assertEquals(jsonNode.get("age").intValue(), 20);
+                break;
+            case "k_int_v_three_schema_separate":
+                kv = (KeyValue<Integer, GenericRecord>) nativeObject;
+                Assert.assertEquals(kv.getKey(), 100);
+                jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+                Assert.assertEquals(jsonNode.get("id").intValue(), 40);
+                Assert.assertEquals(jsonNode.get("name").textValue(), 
"kv-separate");
+                break;
+            default:
+                // nothing to do
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 15c1edd..a59bd91 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -52,6 +51,7 @@ import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -401,7 +401,7 @@ public class MessageImpl<T> implements Message<T> {
                     .atSchemaVersion(schemaVersion));
         } else if (schema instanceof AbstractSchema) {
             byte[] schemaVersion = getSchemaVersion();
-            return Optional.of(((AbstractSchema) schema)
+            return Optional.of(((AbstractSchema<?>) schema)
                     .atSchemaVersion(schemaVersion));
         } else {
             return Optional.of(schema);
@@ -419,11 +419,15 @@ public class MessageImpl<T> implements Message<T> {
 
     private void ensureSchemaIsLoaded() {
         if (schema instanceof AutoConsumeSchema) {
-            ((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
+            ((AutoConsumeSchema) 
schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
         }
     }
+
     private SchemaInfo getSchemaInfo() {
         ensureSchemaIsLoaded();
+        if (schema instanceof AutoConsumeSchema) {
+            return ((AutoConsumeSchema) 
schema).getSchemaInfo(getSchemaVersion());
+        }
         return schema.getSchemaInfo();
     }
 
@@ -449,7 +453,7 @@ public class MessageImpl<T> implements Message<T> {
 
     private KeyValueSchema getKeyValueSchema() {
         if (schema instanceof AutoConsumeSchema) {
-            return (KeyValueSchema) ((AutoConsumeSchema) 
schema).getInternalSchema();
+            return (KeyValueSchema) ((AutoConsumeSchema) 
schema).getInternalSchema(getSchemaVersion());
         } else {
             return (KeyValueSchema) schema;
         }
@@ -476,7 +480,7 @@ public class MessageImpl<T> implements Message<T> {
                     (org.apache.pulsar.common.schema.KeyValue) 
kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
             if (schema instanceof AutoConsumeSchema) {
                 return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
-                        schema.getSchemaInfo().getType(), schemaVersion);
+                        ((AutoConsumeSchema) 
schema).getSchemaInfo(schemaVersion).getType(), schemaVersion);
             } else {
                 return (T) keyValue;
             }
@@ -492,7 +496,7 @@ public class MessageImpl<T> implements Message<T> {
                     (org.apache.pulsar.common.schema.KeyValue) 
kvSchema.decode(getKeyBytes(), getData(), null);
             if (schema instanceof AutoConsumeSchema) {
                 return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
-                        schema.getSchemaInfo().getType(), null);
+                        ((AutoConsumeSchema) 
schema).getSchemaInfo(getSchemaVersion()).getType(), null);
             } else {
                 return (T) keyValue;
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 294233b..f951a88 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -987,7 +987,7 @@ public class PulsarClientImpl implements PulsarClient {
         return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), 
this);
     }
 
-    private LoadingCache<String, SchemaInfoProvider> 
getSchemaProviderLoadingCache() {
+    protected LoadingCache<String, SchemaInfoProvider> 
getSchemaProviderLoadingCache() {
         return schemaProviderLoadingCache;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 59ccde3..565502c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -123,7 +123,7 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> keyBytes(byte[] key) {
-        if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+        if (schema instanceof KeyValueSchema && 
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) schema;
             checkArgument(!(kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED),
                     "This method is not allowed to set keys when in encoding 
type is SEPARATED");
@@ -149,7 +149,8 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
             msgMetadata.setNullValue(true);
             return this;
         }
-        if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() 
== SchemaType.KEY_VALUE) {
+        if (value instanceof org.apache.pulsar.common.schema.KeyValue
+                && schema.getSchemaInfo() != null && 
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
             KeyValueSchema kvSchema = (KeyValueSchema) schema;
             org.apache.pulsar.common.schema.KeyValue kv = 
(org.apache.pulsar.common.schema.KeyValue) value;
             if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index f075ecf..8693e3a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -25,15 +26,16 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import 
org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
+import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -43,7 +45,7 @@ import static com.google.common.base.Preconditions.checkState;
 @Slf4j
 public class AutoConsumeSchema implements Schema<GenericRecord> {
 
-    private Schema<?> schema;
+    private final ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = 
initSchemaMap();
 
     private String topicName;
 
@@ -51,65 +53,89 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
+        ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = 
Maps.newConcurrentMap();
+        // The Schema.BYTES will not be uploaded to the broker and store in 
the schema storage,
+        // if the schema version in the message metadata is empty byte[], it 
means its schema is Schema.BYTES.
+        schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+        return schemaMap;
+    }
+
+    public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+        schemaMap.put(schemaVersion, schema);
+    }
+
     public void setSchema(Schema<?> schema) {
-        this.schema = schema;
+        schemaMap.put(SchemaVersion.Latest, schema);
     }
 
-    private void ensureSchemaInitialized() {
-        checkState(null != schema, "Schema is not initialized before used");
+    private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+        checkState(schemaMap.containsKey(schemaVersion),
+                "Schema version " + schemaVersion + " is not initialized 
before used");
     }
 
     @Override
     public void validate(byte[] message) {
-        ensureSchemaInitialized();
+        ensureSchemaInitialized(SchemaVersion.Latest);
 
-        schema.validate(message);
+        schemaMap.get(SchemaVersion.Latest).validate(message);
     }
 
     @Override
     public byte[] encode(GenericRecord message) {
-        ensureSchemaInitialized();
-
         throw new UnsupportedOperationException("AutoConsumeSchema is not 
intended to be used for encoding");
     }
 
     @Override
     public boolean supportSchemaVersioning() {
-        return schema == null || schema.supportSchemaVersioning();
+        return true;
     }
 
     public Schema<?> atSchemaVersion(byte[] schemaVersion) {
-        fetchSchemaIfNeeded();
-        ensureSchemaInitialized();
-        if (schema.supportSchemaVersioning() && schema instanceof 
AbstractSchema) {
-            return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+        SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        fetchSchemaIfNeeded(sv);
+        ensureSchemaInitialized(sv);
+        Schema<?> topicVersionedSchema = schemaMap.get(sv);
+        if (topicVersionedSchema.supportSchemaVersioning() && 
topicVersionedSchema instanceof AbstractSchema) {
+            return ((AbstractSchema<?>) 
topicVersionedSchema).atSchemaVersion(schemaVersion);
         } else {
-            return schema;
+            return topicVersionedSchema;
         }
     }
 
     @Override
     public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
-        fetchSchemaIfNeeded();
-        ensureSchemaInitialized();
-        return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
+        SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        fetchSchemaIfNeeded(sv);
+        ensureSchemaInitialized(sv);
+        return adapt(schemaMap.get(sv).decode(bytes), schemaVersion);
     }
 
     @Override
     public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
-        if (schema == null) {
-            this.schemaInfoProvider = schemaInfoProvider;
-        } else {
-            schema.setSchemaInfoProvider(schemaInfoProvider);
+        this.schemaInfoProvider = schemaInfoProvider;
+        if (schemaMap.containsKey(SchemaVersion.Latest)) {
+            
schemaMap.get(SchemaVersion.Latest).setSchemaInfoProvider(schemaInfoProvider);
         }
     }
 
     @Override
     public SchemaInfo getSchemaInfo() {
-        if (schema == null) {
+        if (!schemaMap.containsKey(SchemaVersion.Latest)) {
             return null;
         }
-        return schema.getSchemaInfo();
+        return schemaMap.get(SchemaVersion.Latest).getSchemaInfo();
+    }
+
+    public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
+        if (schemaVersion == null) {
+            return Schema.BYTES.getSchemaInfo();
+        }
+        SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        if (schemaMap.containsKey(sv)) {
+            return schemaMap.get(sv).getSchemaInfo();
+        }
+        return null;
     }
 
     @Override
@@ -120,7 +146,7 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         this.componentName = componentName;
         if (schemaInfo != null) {
             Schema<?> genericSchema = generateSchema(schemaInfo);
-            setSchema(genericSchema);
+            setSchema(SchemaVersion.Latest, genericSchema);
             log.info("Configure {} schema for topic {} : {}",
                     componentName, topicName, 
schemaInfo.getSchemaDefinition());
         }
@@ -128,11 +154,11 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
 
     @Override
     public Optional<Object> getNativeSchema() {
-        ensureSchemaInitialized();
-        if (schema == null) {
+        ensureSchemaInitialized(SchemaVersion.Latest);
+        if (schemaMap.get(SchemaVersion.Latest) == null) {
             return Optional.empty();
         } else {
-            return schema.getNativeSchema();
+            return schemaMap.get(SchemaVersion.Latest).getNativeSchema();
         }
     }
 
@@ -205,15 +231,14 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
     }
 
     public Schema<GenericRecord> clone() {
-        Schema<GenericRecord> schema = new AutoConsumeSchema();
-        if (this.schema != null) {
-            schema.configureSchemaInfo(topicName, componentName, 
this.schema.getSchemaInfo());
-        } else {
-            schema.configureSchemaInfo(topicName, componentName, null);
-        }
+        AutoConsumeSchema schema = new AutoConsumeSchema();
+        schema.configureSchemaInfo(topicName, componentName, null);
         if (schemaInfoProvider != null) {
             schema.setSchemaInfoProvider(schemaInfoProvider);
         }
+        for (Map.Entry<SchemaVersion, Schema<?>> entry : schemaMap.entrySet()) 
{
+            schema.setSchema(entry.getKey(), entry.getValue());
+        }
         return schema;
     }
 
@@ -226,19 +251,23 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         if (value instanceof GenericRecord) {
             return (GenericRecord) value;
         }
-        if (this.schema == null) {
+        BytesSchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+        if (!schemaMap.containsKey(sv)) {
             throw new IllegalStateException("Cannot decode a message without 
schema");
         }
-        return wrapPrimitiveObject(value, schema.getSchemaInfo().getType(), 
schemaVersion);
+        return wrapPrimitiveObject(value, 
schemaMap.get(sv).getSchemaInfo().getType(), schemaVersion);
     }
 
     public static GenericRecord wrapPrimitiveObject(Object value, SchemaType 
type, byte[] schemaVersion) {
         return GenericObjectWrapper.of(value, type, schemaVersion);
     }
 
-
     public Schema<?> getInternalSchema() {
-        return schema;
+        return schemaMap.get(SchemaVersion.Latest);
+    }
+
+    public Schema<?> getInternalSchema(byte[] schemaVersion) {
+        return schemaMap.get(BytesSchemaVersion.of(schemaVersion));
     }
 
     /**
@@ -246,15 +275,18 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
      * We cannot call this method in getSchemaInfo, because getSchemaInfo is 
called in many
      * places and we will introduce lots of deadlocks.
      */
-    public void fetchSchemaIfNeeded() throws SchemaSerializationException {
-        if (schema == null) {
+    public void fetchSchemaIfNeeded(SchemaVersion schemaVersion) throws 
SchemaSerializationException {
+        if (schemaVersion == null) {
+            schemaVersion = BytesSchemaVersion.of(new byte[0]);
+        }
+        if (!schemaMap.containsKey(schemaVersion)) {
             if (schemaInfoProvider == null) {
                 throw new SchemaSerializationException("Can't get accurate 
schema information for topic " + topicName +
                                                 "using AutoConsumeSchema 
because SchemaInfoProvider is not set yet");
             } else {
                 SchemaInfo schemaInfo = null;
                 try {
-                    schemaInfo = schemaInfoProvider.getLatestSchema().get();
+                    schemaInfo = 
schemaInfoProvider.getSchemaByVersion(schemaVersion.bytes()).get();
                     if (schemaInfo == null) {
                         // schemaless topic
                         schemaInfo = BytesSchema.of().getSchemaInfo();
@@ -267,18 +299,20 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
                     throw new SchemaSerializationException(e.getCause());
                 }
                 // schemaInfo null means that there is no schema attached to 
the topic.
-                schema = generateSchema(schemaInfo);
+                Schema<?> schema = generateSchema(schemaInfo);
                 schema.setSchemaInfoProvider(schemaInfoProvider);
-                log.info("Configure {} schema for topic {} : {}",
-                        componentName, topicName, 
schemaInfo.getSchemaDefinition());
+                setSchema(schemaVersion, schema);
+                log.info("Configure {} schema {} for topic {} : {}",
+                        componentName, schemaVersion, topicName, 
schemaInfo.getSchemaDefinition());
             }
         }
     }
 
     @Override
     public String toString() {
-        if (schema != null && schema.getSchemaInfo() != null) {
-            return "AUTO_CONSUME(schematype=" + 
schema.getSchemaInfo().getType() + ")";
+        if (schemaMap.containsKey(SchemaVersion.Latest)
+                && schemaMap.get(SchemaVersion.Latest).getSchemaInfo() != 
null) {
+            return "AUTO_CONSUME(schematype=" + 
schemaMap.get(SchemaVersion.Latest).getSchemaInfo().getType() + ")";
         } else {
             return "AUTO_CONSUME(uninitialized)";
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 7578ffa..a85aceb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
+import java.util.Optional;
+
 /**
  * Auto detect schema.
  */
@@ -71,7 +74,12 @@ public class AutoProduceBytesSchema<T> implements 
Schema<byte[]> {
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            schema.validate(message);
+            if (schema instanceof KeyValueSchema
+                    && ((KeyValueSchema) 
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchema) schema).getValueSchema().validate(message);
+            } else {
+                schema.validate(message);
+            }
         }
 
         return message;
@@ -97,6 +105,11 @@ public class AutoProduceBytesSchema<T> implements 
Schema<byte[]> {
     }
 
     @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.ofNullable(schema);
+    }
+
+    @Override
     public Schema<byte[]> clone() {
         return new AutoProduceBytesSchema<>(schema.clone());
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index 4c430be..666177d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -93,7 +93,7 @@ public class MultiVersionSchemaInfoProvider implements 
SchemaInfoProvider {
     }
 
     private CompletableFuture<SchemaInfo> loadSchema(byte[] schemaVersion) {
-         return pulsarClient.getLookup()
+        return pulsarClient.getLookup()
                 .getSchema(topicName, schemaVersion)
                 .thenApply(o -> o.orElse(null));
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
index 0865579..ac32318 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.CompletableFuture;
@@ -74,7 +75,7 @@ public class AbstractGenericSchemaTest {
 
             GenericRecord record;
             if (decodeSchema instanceof AutoConsumeSchema) {
-                record = decodeSchema.decode(data, new byte[0]);
+                record = decodeSchema.decode(data, new 
LongSchemaVersion(0L).bytes());
             } else {
                 record = decodeSchema.decode(data);
             }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 477e3d8..e2fc5d3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -39,6 +39,7 @@ import 
org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.testng.annotations.Test;
 
 /**
@@ -85,7 +86,7 @@ public class GenericSchemaImplTest {
     public void testAutoJsonSchema() {
         // configure the schema info provider
         MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = 
mock(MultiVersionSchemaInfoProvider.class);
-        GenericSchema genericAvroSchema = 
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+        GenericSchema genericAvroSchema = 
GenericSchemaImpl.of(Schema.JSON(Foo.class).getSchemaInfo());
         
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
                 
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
 
@@ -111,7 +112,7 @@ public class GenericSchemaImplTest {
 
             GenericRecord record;
             if (decodeSchema instanceof AutoConsumeSchema) {
-                record = decodeSchema.decode(data, new byte[0]);
+                record = decodeSchema.decode(data, new 
LongSchemaVersion(0L).bytes());
             } else {
                 record = decodeSchema.decode(data);
             }
@@ -123,15 +124,6 @@ public class GenericSchemaImplTest {
     public void testKeyValueSchema() {
         // configure the schema info provider
         MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = 
mock(MultiVersionSchemaInfoProvider.class);
-        GenericSchema genericAvroSchema = 
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
-        
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(CompletableFuture.completedFuture(
-                    KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
-                        genericAvroSchema,
-                        genericAvroSchema,
-                        KeyValueEncodingType.INLINE
-                    )
-                ));
 
         List<Schema<Foo>> encodeSchemas = Lists.newArrayList(
             Schema.JSON(Foo.class),
@@ -152,6 +144,16 @@ public class GenericSchemaImplTest {
                 decodeSchema.configureSchemaInfo(
                     "test-topic", "topic",kvSchema.getSchemaInfo()
                 );
+
+                GenericSchema genericAvroSchema = 
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+                
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                        .thenReturn(CompletableFuture.completedFuture(
+                                KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+                                        keySchema,
+                                        valueSchema,
+                                        KeyValueEncodingType.INLINE
+                                )
+                        ));
                 
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
                 testEncodeAndDecodeKeyValues(kvSchema, decodeSchema);
@@ -167,7 +169,7 @@ public class GenericSchemaImplTest {
             Foo foo = newFoo(i);
             byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo));
 
-            KeyValue<GenericRecord, GenericRecord> kv = 
decodeSchema.decode(data, new byte[0]);
+            KeyValue<GenericRecord, GenericRecord> kv = 
decodeSchema.decode(data, new LongSchemaVersion(0L).bytes());
             verifyFooRecord(kv.getKey(), i);
             verifyFooRecord(kv.getValue(), i);
         }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index 0910e86..6e3c942 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -30,6 +30,7 @@ import 
org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.testng.annotations.Test;
 
 import java.util.List;
@@ -85,7 +86,7 @@ public class GenericSchemaTest {
     public void testAutoJsonSchema() {
         // configure the schema info provider
         MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = 
mock(MultiVersionSchemaInfoProvider.class);
-        GenericSchema genericAvroSchema = 
GenericAvroSchema.of(Schema.AVRO(Foo.class).getSchemaInfo());
+        GenericSchema genericAvroSchema = 
GenericAvroSchema.of(Schema.JSON(Foo.class).getSchemaInfo());
         
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
                 
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
 
@@ -111,7 +112,7 @@ public class GenericSchemaTest {
 
             GenericRecord record;
             if (decodeSchema instanceof AutoConsumeSchema) {
-                record = decodeSchema.decode(data, new byte[0]);
+                record = decodeSchema.decode(data, new 
LongSchemaVersion(0L).bytes());
             } else {
                 record = decodeSchema.decode(data);
             }
@@ -123,15 +124,6 @@ public class GenericSchemaTest {
     public void testKeyValueSchema() {
         // configure the schema info provider
         MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = 
mock(MultiVersionSchemaInfoProvider.class);
-        GenericSchema genericAvroSchema = 
GenericAvroSchema.of(Schema.AVRO(Foo.class).getSchemaInfo());
-        
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(CompletableFuture.completedFuture(
-                    KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
-                        genericAvroSchema,
-                        genericAvroSchema,
-                        KeyValueEncodingType.INLINE
-                    )
-                ));
 
         List<Schema<Foo>> encodeSchemas = Lists.newArrayList(
             Schema.JSON(Foo.class),
@@ -152,6 +144,16 @@ public class GenericSchemaTest {
                 decodeSchema.configureSchemaInfo(
                     "test-topic", "topic",kvSchema.getSchemaInfo()
                 );
+
+                
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+                        .thenReturn(CompletableFuture.completedFuture(
+                                KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+                                        keySchema,
+                                        valueSchema,
+                                        KeyValueEncodingType.INLINE
+                                )
+                        ));
+
                 
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
                 testEncodeAndDecodeKeyValues(kvSchema, decodeSchema);
@@ -167,7 +169,7 @@ public class GenericSchemaTest {
             Foo foo = newFoo(i);
             byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo));
 
-            KeyValue<GenericRecord, GenericRecord> kv = 
decodeSchema.decode(data, new byte[0]);
+            KeyValue<GenericRecord, GenericRecord> kv = 
decodeSchema.decode(data, new LongSchemaVersion(1L).bytes());
             verifyFooRecord(kv.getKey(), i);
             verifyFooRecord(kv.getValue(), i);
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aa3a6b3..b14bea5 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -37,6 +37,7 @@ import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -357,6 +358,12 @@ public class LookupProxyHandler {
         final long clientRequestId = commandGetSchema.getRequestId();
         String serviceUrl = getBrokerServiceUrl(clientRequestId);
         String topic = commandGetSchema.getTopic();
+        Optional<SchemaVersion> schemaVersion;
+        if (commandGetSchema.hasSchemaVersion()) {
+            schemaVersion = 
Optional.of(commandGetSchema.getSchemaVersion()).map(BytesSchemaVersion::of);
+        } else {
+            schemaVersion = Optional.empty();
+        }
 
         if(!StringUtils.isNotBlank(serviceUrl)) {
             return;
@@ -375,12 +382,7 @@ public class LookupProxyHandler {
             // Connected to backend broker
             long requestId = proxyConnection.newRequestId();
             ByteBuf command;
-            byte[] schemaVersion = null;
-            if (commandGetSchema.hasSchemaVersion()) {
-                schemaVersion = commandGetSchema.getSchemaVersion();
-            }
-            command = Commands.newGetSchema(requestId, topic,
-                    
Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
+            command = Commands.newGetSchema(requestId, topic, schemaVersion);
             clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) 
-> {
                 if (t != null) {
                     log.warn("[{}] Failed to get schema {}: {}", 
clientAddress, topic, t);

Reply via email to