This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af1360fb167 [PIP-236][fix][broker]Fix using schema to create consumer
fails after using AUTO_CONSUME consumer to subscribe topic (#17449)
af1360fb167 is described below
commit af1360fb167c1f9484fda5771df3ea9b21d1440b
Author: sinan liu <[email protected]>
AuthorDate: Tue Mar 7 16:34:32 2023 +0800
[PIP-236][fix][broker]Fix using schema to create consumer fails after using
AUTO_CONSUME consumer to subscribe topic (#17449)
Fixes #17354
PIP #19113
### Motivation
*Fixed the failure to use schema to create consumer after using
AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the
error message as `IncompatibleSchemaException("Topic does not have schema to
check")`.*
### Modifications
*In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an
active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe
to the topic. Continuing to create a schema consumer to subscribe to the topic
will fail.*
- When `numActiveConsumers != 0`, and check the schema of the currently
existing consumers is AUTO_CONSUME schema.
---
.../org/apache/pulsar/broker/service/Consumer.java | 16 +++++
.../apache/pulsar/broker/service/ServerCnx.java | 3 +-
.../pulsar/broker/service/SubscriptionOption.java | 2 +
.../service/nonpersistent/NonPersistentTopic.java | 21 ++++---
.../broker/service/persistent/PersistentTopic.java | 42 +++++++------
.../apache/pulsar/client/api/SimpleSchemaTest.java | 73 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 ++
.../client/impl/schema/AutoConsumeSchema.java | 6 ++
.../apache/pulsar/common/protocol/Commands.java | 12 +++-
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
10 files changed, 153 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 47da95b34ac..1ee3f513ef2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -142,12 +143,24 @@ public class Consumer {
private long negtiveUnackedMsgsTimestamp;
+ @Getter
+ private final SchemaType schemaType;
+
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
KeySharedMeta keySharedMeta, MessageId startMessageId,
long consumerEpoch) {
+ this(subscription, subType, topicName, consumerId, priorityLevel,
consumerName, isDurable, cnx, appId,
+ metadata, readCompacted, keySharedMeta, startMessageId,
consumerEpoch, null);
+ }
+ public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
+ int priorityLevel, String consumerName,
+ boolean isDurable, TransportCnx cnx, String appId,
+ Map<String, String> metadata, boolean readCompacted,
+ KeySharedMeta keySharedMeta, MessageId startMessageId,
+ long consumerEpoch, SchemaType schemaType) {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
@@ -204,6 +217,8 @@ public class Consumer {
this.consumerEpoch = consumerEpoch;
this.isAcknowledgmentAtBatchIndexLevelEnabled =
subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
+
+ this.schemaType = schemaType;
}
@VisibleForTesting
@@ -231,6 +246,7 @@ public class Consumer {
this.clientAddress = null;
this.startMessageId = null;
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
+ this.schemaType = null;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 071e8a46d56..4fc79a124ac 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1195,8 +1195,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
+ .schemaType(schema == null ? null :
schema.getType())
.build();
- if (schema != null) {
+ if (schema != null && schema.getType() !=
SchemaType.AUTO_CONSUME) {
return
topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v ->
topic.subscribe(option));
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index d375c539e55..af56d023616 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
@Getter
@Builder
@@ -49,6 +50,7 @@ public class SubscriptionOption {
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
+ private SchemaType schemaType;
public static Optional<Map<String, String>>
getPropertiesMap(List<KeyValue> list) {
if (list == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3b046570d73..a0a8462a227 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -87,6 +87,7 @@ import
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -255,7 +256,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
option.isDurable(), option.getStartMessageId(),
option.getMetadata(),
option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(),
- option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(null));
+ option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(null),
+ option.getSchemaType());
}
@Override
@@ -268,7 +270,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType,
priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted,
resetStartMessageBackInSec,
- replicateSubscriptionState, keySharedMeta, null);
+ replicateSubscriptionState, keySharedMeta, null, null);
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx
cnx, String subscriptionName,
@@ -279,7 +281,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
long
resetStartMessageBackInSec,
boolean
replicateSubscriptionState,
KeySharedMeta
keySharedMeta,
- Map<String, String>
subscriptionProperties) {
+ Map<String, String>
subscriptionProperties,
+ SchemaType
schemaType) {
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__
-> {
final CompletableFuture<Consumer> future = new
CompletableFuture<>();
@@ -321,8 +324,8 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
name -> new NonPersistentSubscription(this,
subscriptionName, isDurable, subscriptionProperties));
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel, consumerName,
- false, cnx, cnx.getAuthRole(), metadata, readCompacted,
keySharedMeta,
- MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ false, cnx, cnx.getAuthRole(), metadata, readCompacted,
keySharedMeta, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH, schemaType);
if (isMigrated()) {
consumer.topicMigrated(getClusterMigrationUrl());
}
@@ -1162,12 +1165,14 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData
schema) {
return hasSchema().thenCompose((hasSchema) -> {
- int numActiveConsumers = subscriptions.values().stream()
- .mapToInt(subscription ->
subscription.getConsumers().size())
+ int numActiveConsumersWithoutAutoSchema =
subscriptions.values().stream()
+ .mapToInt(subscription ->
subscription.getConsumers().stream()
+ .filter(consumer -> consumer.getSchemaType() !=
SchemaType.AUTO_CONSUME)
+ .toList().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
- || (numActiveConsumers != 0)
+ || (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4277edb074c..fd0b0694212 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -155,6 +155,7 @@ import
org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
@@ -727,7 +728,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(),
-
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch());
+
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
+ option.getConsumerEpoch(), option.getSchemaType());
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx
cnx, String subscriptionName,
@@ -740,7 +742,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
boolean
replicatedSubscriptionStateArg,
KeySharedMeta
keySharedMeta,
Map<String, String>
subscriptionProperties,
- long consumerEpoch) {
+ long consumerEpoch,
+ SchemaType
schemaType) {
if (readCompacted && !(subType == SubType.Failover || subType ==
SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive
subscriptions"));
@@ -828,7 +831,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Consumer> future =
subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic,
consumerId, priorityLevel,
consumerName, isDurable, cnx, cnx.getAuthRole(),
metadata,
- readCompacted, keySharedMeta, startMessageId,
consumerEpoch);
+ readCompacted, keySharedMeta, startMessageId,
consumerEpoch, schemaType);
return addConsumerToSubscription(subscription,
consumer).thenCompose(v -> {
if (subscription instanceof PersistentSubscription
persistentSubscription) {
@@ -907,7 +910,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType,
priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted,
initialPosition, startMessageRollbackDurationSec,
- replicatedSubscriptionStateArg, keySharedMeta, null,
DEFAULT_CONSUMER_EPOCH);
+ replicatedSubscriptionStateArg, keySharedMeta, null,
DEFAULT_CONSUMER_EPOCH, null);
}
private CompletableFuture<Subscription> getDurableSubscription(String
subscriptionName,
@@ -3107,21 +3110,22 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData
schema) {
- return hasSchema()
- .thenCompose((hasSchema) -> {
- int numActiveConsumers = subscriptions.values().stream()
- .mapToInt(subscription ->
subscription.getConsumers().size())
- .sum();
- if (hasSchema
- || (!producers.isEmpty())
- || (numActiveConsumers != 0)
- || (ledger.getTotalSize() != 0)) {
- return checkSchemaCompatibleForConsumer(schema);
- } else {
- return addSchema(schema).thenCompose(schemaVersion ->
- CompletableFuture.completedFuture(null));
- }
- });
+ return hasSchema().thenCompose((hasSchema) -> {
+ int numActiveConsumersWithoutAutoSchema =
subscriptions.values().stream()
+ .mapToInt(subscription ->
subscription.getConsumers().stream()
+ .filter(consumer -> consumer.getSchemaType() !=
SchemaType.AUTO_CONSUME)
+ .toList().size())
+ .sum();
+ if (hasSchema
+ || (!producers.isEmpty())
+ || (numActiveConsumersWithoutAutoSchema != 0)
+ || (ledger.getTotalSize() != 0)) {
+ return checkSchemaCompatibleForConsumer(schema);
+ } else {
+ return addSchema(schema).thenCompose(schemaVersion ->
+ CompletableFuture.completedFuture(null));
+ }
+ });
}
public synchronized void checkReplicatedSubscriptionControllerState() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 21d6a7ed89a..c8c7c3b2ccc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -1239,6 +1239,79 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(),
SchemaType.STRING);
}
+ @Test(dataProvider = "topicDomain")
+ public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain)
throws Exception {
+ final String topic = domain +
"my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";
+
+ @Cleanup
+ Consumer<GenericRecord> autoConsumer1 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub0")
+ .consumerName("autoConsumer1")
+ .subscribe();
+ @Cleanup
+ Consumer<GenericRecord> autoConsumer2 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub0")
+ .consumerName("autoConsumer2")
+ .subscribe();
+ @Cleanup
+ Consumer<GenericRecord> autoConsumer3 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub1")
+ .consumerName("autoConsumer3")
+ .subscribe();
+ @Cleanup
+ Consumer<GenericRecord> autoConsumer4 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub1")
+ .consumerName("autoConsumer4")
+ .subscribe();
+ try {
+ log.info("The autoConsumer1 isConnected: " +
autoConsumer1.isConnected());
+ log.info("The autoConsumer2 isConnected: " +
autoConsumer2.isConnected());
+ log.info("The autoConsumer3 isConnected: " +
autoConsumer3.isConnected());
+ log.info("The autoConsumer4 isConnected: " +
autoConsumer4.isConnected());
+ admin.schemas().getSchemaInfo(topic);
+ fail("The schema of topic should not exist");
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 404);
+ }
+
+ @Cleanup
+ Consumer<V1Data> consumerWithSchema1 =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub0")
+ .consumerName("consumerWithSchema-1")
+ .subscribe();
+ @Cleanup
+ Consumer<V1Data> consumerWithSchema2 =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub0")
+ .consumerName("consumerWithSchema-2")
+ .subscribe();
+ @Cleanup
+ Consumer<V1Data> consumerWithSchema3 =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub1")
+ .consumerName("consumerWithSchema-3")
+ .subscribe();
+ @Cleanup
+ Consumer<V1Data> consumerWithSchema4 =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("sub1")
+ .consumerName("consumerWithSchema-4")
+ .subscribe();
+ }
+
@DataProvider(name = "keyEncodingType")
public static Object[] keyEncodingType() {
return new Object[] { KeyValueEncodingType.SEPARATED,
KeyValueEncodingType.INLINE };
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 18abb5a52c4..beaa34bf205 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -811,6 +812,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE
== si.getType())) {
// don't set schema for Schema.BYTES
si = null;
+ } else {
+ if (schema instanceof AutoConsumeSchema
+ &&
Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion()))
{
+ si = AutoConsumeSchema.SCHEMA_INFO;
+ }
}
// startMessageRollbackDurationInSec should be consider only once when
consumer connects to first time
long startMessageRollbackDuration = (startMessageRollbackDurationInSec
> 0
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 33fcd18876b..82a3b69da20 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -57,6 +57,12 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
private SchemaInfoProvider schemaInfoProvider;
+ public static final SchemaInfo SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("AutoConsume")
+ .type(SchemaType.AUTO_CONSUME)
+ .schema(new byte[0])
+ .build();
+
private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = new
ConcurrentHashMap<>();
// The Schema.BYTES will not be uploaded to the broker and store in
the schema storage,
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 081dfe4275b..8a5684cf676 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -771,7 +771,9 @@ public class Commands {
}
private static Schema.Type getSchemaType(SchemaType type) {
- if (type.getValue() < 0) {
+ if (type == SchemaType.AUTO_CONSUME) {
+ return Schema.Type.AutoConsume;
+ } else if (type.getValue() < 0) {
return Schema.Type.None;
} else {
return Schema.Type.valueOf(type.getValue());
@@ -779,7 +781,9 @@ public class Commands {
}
public static SchemaType getSchemaType(Schema.Type type) {
- if (type.getValue() < 0) {
+ if (type == Schema.Type.AutoConsume) {
+ return SchemaType.AUTO_CONSUME;
+ } else if (type.getValue() < 0) {
// this is unexpected
return SchemaType.NONE;
} else {
@@ -1965,6 +1969,10 @@ public class Commands {
return peerVersion >= ProtocolVersion.v17.getValue();
}
+ public static boolean peerSupportsCarryAutoConsumeSchemaToBroker(int
peerVersion) {
+ return peerVersion >= ProtocolVersion.v21.getValue();
+ }
+
private static org.apache.pulsar.common.api.proto.ProducerAccessMode
convertProducerAccessMode(
ProducerAccessMode accessMode) {
switch (accessMode) {
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index acf75eab858..d9c41eeec97 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -45,6 +45,7 @@ message Schema {
LocalTime = 18;
LocalDateTime = 19;
ProtobufNative = 20;
+ AutoConsume = 21;
}
required string name = 1;
@@ -263,6 +264,7 @@ enum ProtocolVersion {
v18 = 18; // Add client support for broker entry metadata
v19 = 19; // Add CommandTcClientConnectRequest and
CommandTcClientConnectResponse
v20 = 20; // Add client support for topic migration redirection
CommandTopicMigrated
+ v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version
}
message CommandConnect {