This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b45adde  [Issue 9602] Add schema type validation (#9797)
b45adde is described below

commit b45added9d27adf5218ca005a82f4faa18b7ec44
Author: limingnihao <[email protected]>
AuthorDate: Fri Mar 12 15:37:05 2021 +0800

    [Issue 9602] Add schema type validation (#9797)
    
    Fixes #9602
    
    ### Motivation
    
    The schema should not be able to modify the type after it is first created. 
Unless he is Always compatible.
    
    ### Modifications
    
    When uploading a schema, it is verified that the type of the newly uploaded 
schema is consistent with the schema in the metadata.
    
    When checkCompatible, change the Primitive type from checking only to 
checking all.
    
    ### Verifying this change
    
    * When a producer is created, the schema is defined, and when a producer is 
created again, the schema type is not always the same, otherwise an exception 
will be thrown.
    
    * When a producer is created, the schema is defined, and when a consumer is 
created again, the schema type is not always the same, otherwise an exception 
will be thrown.
    
    * When a consumer is created, the schema is defined, and when a producer is 
created again, the schema type is not always the same, otherwise an exception 
will be thrown.
    
    * When a consumer is created, the schema is defined, and when a consumer is 
created again, the schema type is not always the same, otherwise an exception 
will be thrown.
    
    * When compatibility is set to always compatible. No exceptions are thrown.
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
      - The schema: (yes)
---
 .../service/schema/SchemaRegistryServiceImpl.java  |  28 +-
 .../SchemaTypeCompatibilityCheckTest.java          | 468 +++++++++++++++++++++
 2 files changed, 485 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index d9dd91f..b06531a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -140,6 +140,15 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                                                               
SchemaCompatibilityStrategy strategy) {
         return 
trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
                 getSchemaVersionBySchemaData(schemaAndMetadataList, 
schema).thenCompose(schemaVersion -> {
+            if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && 
schemaAndMetadataList.size() > 0) {
+                for (SchemaAndMetadata metadata : schemaAndMetadataList) {
+                    if (schema.getType() != metadata.schema.getType()) {
+                        return FutureUtil.failedFuture(new 
IncompatibleSchemaException(
+                                String.format("Incompatible schema: exists 
schema type %s, new schema type %s",
+                                metadata.schema.getType(), schema.getType())));
+                    }
+                }
+            }
             if (schemaVersion != null) {
                 return CompletableFuture.completedFuture(schemaVersion);
             }
@@ -242,17 +251,14 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         SchemaHash existingHash = SchemaHash.of(existingSchema.schema);
         SchemaHash newHash = SchemaHash.of(newSchema);
         SchemaData existingSchemaData = existingSchema.schema;
-        if (existingSchemaData.getType().isPrimitive()) {
-            if (newSchema.getType() != existingSchemaData.getType()) {
-                throw new 
IncompatibleSchemaException(String.format("Incompatible primitive schema: "
-                                + "exists schema type %s, new schema type %s",
-                        existingSchemaData.getType(), newSchema.getType()));
-            }
-        } else {
-            if (!newHash.equals(existingHash)) {
-                compatibilityChecks.getOrDefault(newSchema.getType(), 
SchemaCompatibilityCheck.DEFAULT)
-                        .checkCompatible(existingSchemaData, newSchema, 
strategy);
-            }
+        if (newSchema.getType() != existingSchemaData.getType()) {
+            throw new IncompatibleSchemaException(String.format("Incompatible 
schema: "
+                            + "exists schema type %s, new schema type %s",
+                    existingSchemaData.getType(), newSchema.getType()));
+        }
+        if (!newHash.equals(existingHash)) {
+            compatibilityChecks.getOrDefault(newSchema.getType(), 
SchemaCompatibilityCheck.DEFAULT)
+                    .checkCompatible(existingSchemaData, newSchema, strategy);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
new file mode 100644
index 0000000..c24822f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema.compatibility;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.schema.Schemas;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+public class SchemaTypeCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
+    private final static String CLUSTER_NAME = "test";
+    private final static String PUBLIC_TENANT = "public";
+    private final static String namespace = "test-namespace";
+    private final static String namespaceName = PUBLIC_TENANT + "/" + 
namespace;
+
+    @BeforeClass
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster(CLUSTER_NAME, new 
ClusterData(pulsar.getBrokerServiceUrl()));
+
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME));
+        admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+        admin.namespaces().createNamespace(namespaceName, 
Sets.newHashSet(CLUSTER_NAME));
+
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void structTypeProducerProducerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeProducerProducerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+
+        ProducerBuilder producerBuilder = 
pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+    }
+
+    @Test
+    public void structTypeProducerConsumerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeProducerConsumerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+
+        ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+    }
+
+    @Test
+    public void structTypeConsumerProducerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeConsumerProducerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        ProducerBuilder producerBuilder = 
pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                    .topic(topicName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+    }
+
+    @Test
+    public void structTypeConsumerConsumerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeConsumerConsumerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "1")
+                .subscribe();
+
+        ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "2");
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type JSON, new schema type AVRO"));
+    }
+
+    @Test
+    public void structTypeProducerProducerAlwaysCompatible() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeProducerProducerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+
+        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+    }
+
+    @Test
+    public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeProducerConsumerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+
+        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+    }
+
+    @Test
+    public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeConsumerProducerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .create();
+    }
+
+    @Test
+    public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "structTypeConsumerConsumerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "1")
+                .subscribe();
+
+        pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "2")
+                .subscribe();
+    }
+
+    @Test
+    public void primitiveTypeProducerProducerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeProducerProducerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+
+        ProducerBuilder producerBuilder = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+    }
+
+    @Test
+    public void primitiveTypeProducerConsumerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeProducerConsumerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+
+        ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+    }
+
+    @Test
+    public void primitiveTypeConsumerProducerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeConsumerProducerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        ProducerBuilder producerBuilder = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName);
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
producerBuilder::create);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+    }
+
+    @Test
+    public void primitiveTypeConsumerConsumerUndefinedCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.UNDEFINED);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeConsumerConsumerUndefinedCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "1")
+                .subscribe();
+
+        ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "2");
+
+        Throwable t = 
expectThrows(PulsarClientException.IncompatibleSchemaException.class, 
consumerBuilder::subscribe);
+        assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema 
type INT32, new schema type STRING"));
+    }
+
+    @Test
+    public void primitiveTypeProducerProducerAlwaysCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeProducerProducerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+
+        pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+    }
+
+    @Test
+    public void primitiveTypeProducerConsumerAlwaysCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeProducerConsumerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newProducer(Schema.INT32)
+                .topic(topicName)
+                .create();
+
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "2")
+                .subscribe();
+    }
+
+    @Test
+    public void primitiveTypeConsumerProducerAlwaysCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeConsumerProducerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+    }
+
+    @Test
+    public void primitiveTypeConsumerConsumerAlwaysCompatible() throws 
Exception {
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        final String subName = "my-sub";
+        String topicName = TopicName.get(
+                TopicDomain.persistent.value(),
+                PUBLIC_TENANT,
+                namespace,
+                "primitiveTypeConsumerConsumerAlwaysCompatible"
+        ).toString();
+
+        pulsarClient.newConsumer(Schema.INT32)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "1")
+                .subscribe();
+
+        pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "2")
+                .subscribe();
+    }
+
+}

Reply via email to