This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af1360fb167 [PIP-236][fix][broker]Fix using schema to create consumer 
fails after using AUTO_CONSUME consumer to subscribe topic (#17449)
af1360fb167 is described below

commit af1360fb167c1f9484fda5771df3ea9b21d1440b
Author: sinan liu <[email protected]>
AuthorDate: Tue Mar 7 16:34:32 2023 +0800

    [PIP-236][fix][broker]Fix using schema to create consumer fails after using 
AUTO_CONSUME consumer to subscribe topic (#17449)
    
     Fixes #17354
     PIP #19113
    
    ### Motivation
    
    *Fixed the failure to use schema to create consumer after using 
AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the 
error message as  `IncompatibleSchemaException("Topic does not have schema to 
check")`.*
    
    ### Modifications
    
    *In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an 
active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe 
to the topic. Continuing to create a schema consumer to subscribe to the topic 
will fail.*
    
    -  When `numActiveConsumers != 0`, and check the schema of the currently 
existing consumers is AUTO_CONSUME schema.
---
 .../org/apache/pulsar/broker/service/Consumer.java | 16 +++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 +-
 .../pulsar/broker/service/SubscriptionOption.java  |  2 +
 .../service/nonpersistent/NonPersistentTopic.java  | 21 ++++---
 .../broker/service/persistent/PersistentTopic.java | 42 +++++++------
 .../apache/pulsar/client/api/SimpleSchemaTest.java | 73 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++
 .../client/impl/schema/AutoConsumeSchema.java      |  6 ++
 .../apache/pulsar/common/protocol/Commands.java    | 12 +++-
 pulsar-common/src/main/proto/PulsarApi.proto       |  2 +
 10 files changed, 153 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 47da95b34ac..1ee3f513ef2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -142,12 +143,24 @@ public class Consumer {
 
     private long negtiveUnackedMsgsTimestamp;
 
+    @Getter
+    private final SchemaType schemaType;
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted,
                     KeySharedMeta keySharedMeta, MessageId startMessageId, 
long consumerEpoch) {
+        this(subscription, subType, topicName, consumerId, priorityLevel, 
consumerName, isDurable, cnx, appId,
+                metadata, readCompacted, keySharedMeta, startMessageId, 
consumerEpoch, null);
+    }
 
+    public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
+                    int priorityLevel, String consumerName,
+                    boolean isDurable, TransportCnx cnx, String appId,
+                    Map<String, String> metadata, boolean readCompacted,
+                    KeySharedMeta keySharedMeta, MessageId startMessageId,
+                    long consumerEpoch, SchemaType schemaType) {
         this.subscription = subscription;
         this.subType = subType;
         this.topicName = topicName;
@@ -204,6 +217,8 @@ public class Consumer {
         this.consumerEpoch = consumerEpoch;
         this.isAcknowledgmentAtBatchIndexLevelEnabled = 
subscription.getTopic().getBrokerService()
                 
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
+
+        this.schemaType = schemaType;
     }
 
     @VisibleForTesting
@@ -231,6 +246,7 @@ public class Consumer {
         this.clientAddress = null;
         this.startMessageId = null;
         this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
+        this.schemaType = null;
         MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 071e8a46d56..4fc79a124ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1195,8 +1195,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                     
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
                                     
.subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
+                                    .schemaType(schema == null ? null : 
schema.getType())
                                     .build();
-                            if (schema != null) {
+                            if (schema != null && schema.getType() != 
SchemaType.AUTO_CONSUME) {
                                 return 
topic.addSchemaIfIdleOrCheckCompatible(schema)
                                         .thenCompose(v -> 
topic.subscribe(option));
                             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index d375c539e55..af56d023616 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 
 @Getter
 @Builder
@@ -49,6 +50,7 @@ public class SubscriptionOption {
     private KeySharedMeta keySharedMeta;
     private Optional<Map<String, String>> subscriptionProperties;
     private long consumerEpoch;
+    private SchemaType schemaType;
 
     public static Optional<Map<String, String>> 
getPropertiesMap(List<KeyValue> list) {
         if (list == null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3b046570d73..a0a8462a227 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -87,6 +87,7 @@ import 
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -255,7 +256,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                 option.isDurable(), option.getStartMessageId(), 
option.getMetadata(),
                 option.isReadCompacted(),
                 option.getStartMessageRollbackDurationSec(), 
option.isReplicatedSubscriptionStateArg(),
-                option.getKeySharedMeta(), 
option.getSubscriptionProperties().orElse(null));
+                option.getKeySharedMeta(), 
option.getSubscriptionProperties().orElse(null),
+                option.getSchemaType());
     }
 
     @Override
@@ -268,7 +270,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                                                  KeySharedMeta keySharedMeta) {
         return internalSubscribe(cnx, subscriptionName, consumerId, subType, 
priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, 
resetStartMessageBackInSec,
-                replicateSubscriptionState, keySharedMeta, null);
+                replicateSubscriptionState, keySharedMeta, null, null);
     }
 
     private CompletableFuture<Consumer> internalSubscribe(final TransportCnx 
cnx, String subscriptionName,
@@ -279,7 +281,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                                                           long 
resetStartMessageBackInSec,
                                                           boolean 
replicateSubscriptionState,
                                                           KeySharedMeta 
keySharedMeta,
-                                                          Map<String, String> 
subscriptionProperties) {
+                                                          Map<String, String> 
subscriptionProperties,
+                                                          SchemaType 
schemaType) {
 
         return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ 
-> {
             final CompletableFuture<Consumer> future = new 
CompletableFuture<>();
@@ -321,8 +324,8 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                     name -> new NonPersistentSubscription(this, 
subscriptionName, isDurable, subscriptionProperties));
 
             Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel, consumerName,
-                    false, cnx, cnx.getAuthRole(), metadata, readCompacted, 
keySharedMeta,
-                    MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                    false, cnx, cnx.getAuthRole(), metadata, readCompacted, 
keySharedMeta, MessageId.latest,
+                    DEFAULT_CONSUMER_EPOCH, schemaType);
             if (isMigrated()) {
                 consumer.topicMigrated(getClusterMigrationUrl());
             }
@@ -1162,12 +1165,14 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
     @Override
     public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData 
schema) {
         return hasSchema().thenCompose((hasSchema) -> {
-            int numActiveConsumers = subscriptions.values().stream()
-                    .mapToInt(subscription -> 
subscription.getConsumers().size())
+            int numActiveConsumersWithoutAutoSchema = 
subscriptions.values().stream()
+                    .mapToInt(subscription -> 
subscription.getConsumers().stream()
+                            .filter(consumer -> consumer.getSchemaType() != 
SchemaType.AUTO_CONSUME)
+                            .toList().size())
                     .sum();
             if (hasSchema
                     || (!producers.isEmpty())
-                    || (numActiveConsumers != 0)
+                    || (numActiveConsumersWithoutAutoSchema != 0)
                     || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
                 return checkSchemaCompatibleForConsumer(schema);
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4277edb074c..fd0b0694212 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -155,6 +155,7 @@ import 
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -727,7 +728,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
                 option.getInitialPosition(), 
option.getStartMessageRollbackDurationSec(),
                 option.isReplicatedSubscriptionStateArg(), 
option.getKeySharedMeta(),
-                
option.getSubscriptionProperties().orElse(Collections.emptyMap()), 
option.getConsumerEpoch());
+                
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
+                option.getConsumerEpoch(), option.getSchemaType());
     }
 
     private CompletableFuture<Consumer> internalSubscribe(final TransportCnx 
cnx, String subscriptionName,
@@ -740,7 +742,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                           boolean 
replicatedSubscriptionStateArg,
                                                           KeySharedMeta 
keySharedMeta,
                                                           Map<String, String> 
subscriptionProperties,
-                                                          long consumerEpoch) {
+                                                          long consumerEpoch,
+                                                          SchemaType 
schemaType) {
         if (readCompacted && !(subType == SubType.Failover || subType == 
SubType.Exclusive)) {
             return FutureUtil.failedFuture(new NotAllowedException(
                     "readCompacted only allowed on failover or exclusive 
subscriptions"));
@@ -828,7 +831,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             CompletableFuture<Consumer> future = 
subscriptionFuture.thenCompose(subscription -> {
                 Consumer consumer = new Consumer(subscription, subType, topic, 
consumerId, priorityLevel,
                         consumerName, isDurable, cnx, cnx.getAuthRole(), 
metadata,
-                        readCompacted, keySharedMeta, startMessageId, 
consumerEpoch);
+                        readCompacted, keySharedMeta, startMessageId, 
consumerEpoch, schemaType);
 
                 return addConsumerToSubscription(subscription, 
consumer).thenCompose(v -> {
                     if (subscription instanceof PersistentSubscription 
persistentSubscription) {
@@ -907,7 +910,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                  KeySharedMeta keySharedMeta) {
         return internalSubscribe(cnx, subscriptionName, consumerId, subType, 
priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, 
initialPosition, startMessageRollbackDurationSec,
-                replicatedSubscriptionStateArg, keySharedMeta, null, 
DEFAULT_CONSUMER_EPOCH);
+                replicatedSubscriptionStateArg, keySharedMeta, null, 
DEFAULT_CONSUMER_EPOCH, null);
     }
 
     private CompletableFuture<Subscription> getDurableSubscription(String 
subscriptionName,
@@ -3107,21 +3110,22 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @Override
     public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData 
schema) {
-        return hasSchema()
-            .thenCompose((hasSchema) -> {
-                int numActiveConsumers = subscriptions.values().stream()
-                        .mapToInt(subscription -> 
subscription.getConsumers().size())
-                        .sum();
-                if (hasSchema
-                        || (!producers.isEmpty())
-                        || (numActiveConsumers != 0)
-                        || (ledger.getTotalSize() != 0)) {
-                    return checkSchemaCompatibleForConsumer(schema);
-                } else {
-                    return addSchema(schema).thenCompose(schemaVersion ->
-                            CompletableFuture.completedFuture(null));
-                }
-            });
+        return hasSchema().thenCompose((hasSchema) -> {
+            int numActiveConsumersWithoutAutoSchema = 
subscriptions.values().stream()
+                    .mapToInt(subscription -> 
subscription.getConsumers().stream()
+                            .filter(consumer -> consumer.getSchemaType() != 
SchemaType.AUTO_CONSUME)
+                            .toList().size())
+                    .sum();
+            if (hasSchema
+                    || (!producers.isEmpty())
+                    || (numActiveConsumersWithoutAutoSchema != 0)
+                    || (ledger.getTotalSize() != 0)) {
+                return checkSchemaCompatibleForConsumer(schema);
+            } else {
+                return addSchema(schema).thenCompose(schemaVersion ->
+                        CompletableFuture.completedFuture(null));
+            }
+        });
     }
 
     public synchronized void checkReplicatedSubscriptionControllerState() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 21d6a7ed89a..c8c7c3b2ccc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -1239,6 +1239,79 @@ public class SimpleSchemaTest extends 
ProducerConsumerBase {
         Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), 
SchemaType.STRING);
     }
 
+    @Test(dataProvider = "topicDomain")
+    public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) 
throws Exception {
+        final String topic = domain + 
"my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+        @Cleanup
+        Consumer<GenericRecord> autoConsumer1 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer1")
+                .subscribe();
+        @Cleanup
+        Consumer<GenericRecord> autoConsumer2 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("autoConsumer2")
+                .subscribe();
+        @Cleanup
+        Consumer<GenericRecord> autoConsumer3 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer3")
+                .subscribe();
+        @Cleanup
+        Consumer<GenericRecord> autoConsumer4 = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("autoConsumer4")
+                .subscribe();
+        try {
+            log.info("The autoConsumer1 isConnected: " + 
autoConsumer1.isConnected());
+            log.info("The autoConsumer2 isConnected: " + 
autoConsumer2.isConnected());
+            log.info("The autoConsumer3 isConnected: " + 
autoConsumer3.isConnected());
+            log.info("The autoConsumer4 isConnected: " + 
autoConsumer4.isConnected());
+            admin.schemas().getSchemaInfo(topic);
+            fail("The schema of topic should not exist");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 404);
+        }
+
+        @Cleanup
+        Consumer<V1Data> consumerWithSchema1 = 
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-1")
+                .subscribe();
+        @Cleanup
+        Consumer<V1Data> consumerWithSchema2 = 
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub0")
+                .consumerName("consumerWithSchema-2")
+                .subscribe();
+        @Cleanup
+        Consumer<V1Data> consumerWithSchema3 = 
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("consumerWithSchema-3")
+                .subscribe();
+        @Cleanup
+        Consumer<V1Data> consumerWithSchema4 = 
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("sub1")
+                .consumerName("consumerWithSchema-4")
+                .subscribe();
+    }
+
     @DataProvider(name = "keyEncodingType")
     public static Object[] keyEncodingType() {
         return new Object[] { KeyValueEncodingType.SEPARATED, 
KeyValueEncodingType.INLINE };
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 18abb5a52c4..beaa34bf205 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -811,6 +812,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE 
== si.getType())) {
             // don't set schema for Schema.BYTES
             si = null;
+        } else {
+            if (schema instanceof AutoConsumeSchema
+                    && 
Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion()))
 {
+                si = AutoConsumeSchema.SCHEMA_INFO;
+            }
         }
         // startMessageRollbackDurationInSec should be consider only once when 
consumer connects to first time
         long startMessageRollbackDuration = (startMessageRollbackDurationInSec 
> 0
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 33fcd18876b..82a3b69da20 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -57,6 +57,12 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
 
     private SchemaInfoProvider schemaInfoProvider;
 
+    public static final SchemaInfo SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("AutoConsume")
+            .type(SchemaType.AUTO_CONSUME)
+            .schema(new byte[0])
+            .build();
+
     private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
         ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = new 
ConcurrentHashMap<>();
         // The Schema.BYTES will not be uploaded to the broker and store in 
the schema storage,
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 081dfe4275b..8a5684cf676 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -771,7 +771,9 @@ public class Commands {
     }
 
     private static Schema.Type getSchemaType(SchemaType type) {
-        if (type.getValue() < 0) {
+        if (type == SchemaType.AUTO_CONSUME) {
+            return Schema.Type.AutoConsume;
+        } else if (type.getValue() < 0) {
             return Schema.Type.None;
         } else {
             return Schema.Type.valueOf(type.getValue());
@@ -779,7 +781,9 @@ public class Commands {
     }
 
     public static SchemaType getSchemaType(Schema.Type type) {
-        if (type.getValue() < 0) {
+        if (type == Schema.Type.AutoConsume) {
+            return SchemaType.AUTO_CONSUME;
+        } else if (type.getValue() < 0) {
             // this is unexpected
             return SchemaType.NONE;
         } else {
@@ -1965,6 +1969,10 @@ public class Commands {
         return peerVersion >= ProtocolVersion.v17.getValue();
     }
 
+    public static boolean peerSupportsCarryAutoConsumeSchemaToBroker(int 
peerVersion) {
+        return peerVersion >= ProtocolVersion.v21.getValue();
+    }
+
     private static org.apache.pulsar.common.api.proto.ProducerAccessMode 
convertProducerAccessMode(
             ProducerAccessMode accessMode) {
         switch (accessMode) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index acf75eab858..d9c41eeec97 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -45,6 +45,7 @@ message Schema {
         LocalTime = 18;
         LocalDateTime = 19;
         ProtobufNative = 20;
+        AutoConsume = 21;
     }
 
     required string name = 1;
@@ -263,6 +264,7 @@ enum ProtocolVersion {
     v18 = 18; // Add client support for broker entry metadata
     v19 = 19; // Add CommandTcClientConnectRequest and 
CommandTcClientConnectResponse
     v20 = 20; // Add client support for topic migration redirection 
CommandTopicMigrated
+    v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version
 }
 
 message CommandConnect {

Reply via email to