This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a8ce990a72c [fix][broker] Replication stuck when partitions count
between two clusters is not the same (#22983)
a8ce990a72c is described below
commit a8ce990a72c3024fafe689f3bc3c5127583021e6
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 15 23:01:47 2024 +0800
[fix][broker] Replication stuck when partitions count between two clusters
is not the same (#22983)
---
.../pulsar/broker/service/AbstractReplicator.java | 5 +
.../service/persistent/PersistentReplicator.java | 6 ++
.../broker/service/AbstractReplicatorTest.java | 7 +-
.../broker/service/OneWayReplicatorTest.java | 91 ++++++++++++++++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++
.../api/NonPartitionedTopicExpectedTest.java | 118 +++++++++++++++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 46 ++++++--
.../impl/conf/ProducerConfigurationData.java | 2 +
8 files changed, 271 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 8552a9f09e9..424263720f0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -184,6 +185,10 @@ public abstract class AbstractReplicator implements
Replicator {
}
log.info("[{}] Starting replicator", replicatorId);
+ // Force only replicate messages to a non-partitioned topic, to avoid
auto-create a partitioned topic on
+ // the remote cluster.
+ ProducerBuilderImpl builderImpl = (ProducerBuilderImpl)
producerBuilder;
+ builderImpl.getConf().setNonPartitionedTopicExpected(true);
producerBuilder.createAsync().thenAccept(producer -> {
setProducerAndTriggerReadEntries(producer);
}).exceptionally(ex -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 54b8993784e..33e883ab940 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -154,6 +154,12 @@ public abstract class PersistentReplicator extends
AbstractReplicator
Pair<Boolean, State> changeStateRes;
changeStateRes = compareSetAndGetState(Starting, Started);
if (changeStateRes.getLeft()) {
+ if (!(producer instanceof ProducerImpl)) {
+ log.error("[{}] The partitions count between two clusters is
not the same, the replicator can not be"
+ + " created successfully: {}", replicatorId, state);
+ doCloseProducerAsync(producer, () -> {});
+ throw new ClassCastException(producer.getClass().getName() + "
can not be cast to ProducerImpl");
+ }
this.producer = (ProducerImpl) producer;
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
// Trigger a new read.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 64d3088b206..7415a40ad55 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -39,10 +39,11 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
@@ -71,7 +72,8 @@ public class AbstractReplicatorTest {
when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
- final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+ final ProducerConfigurationData producerConf = new
ProducerConfigurationData();
+ final ProducerBuilderImpl producerBuilder =
mock(ProducerBuilderImpl.class);
final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
when(broker.executor()).thenReturn(eventLoopGroup);
when(broker.getTopics()).thenReturn(topics);
@@ -87,6 +89,7 @@ public class AbstractReplicatorTest {
when(producerBuilder.sendTimeout(anyInt(),
any())).thenReturn(producerBuilder);
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+ when(producerBuilder.getConf()).thenReturn(producerConf);
// Mock create producer fail.
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked
ex"));
when(producerBuilder.createAsync())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 9aad26530df..1745d4dc90f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -50,7 +51,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
@@ -79,9 +82,11 @@ import
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
+import
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
@@ -1069,4 +1074,90 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin1.topics().delete(topic3, false);
admin2.topics().delete(topic3, false);
}
+
+ @DataProvider(name = "replicationModes")
+ public Object[][] replicationModes() {
+ return new Object[][]{
+ {ReplicationMode.OneWay},
+ {ReplicationMode.DoubleWay}
+ };
+ }
+
+ protected enum ReplicationMode {
+ OneWay,
+ DoubleWay;
+ }
+
+ @Test(dataProvider = "replicationModes")
+ public void testDifferentTopicCreationRule(ReplicationMode
replicationMode) throws Exception {
+ String ns = defaultTenant + "/" +
UUID.randomUUID().toString().replace("-", "");
+ admin1.namespaces().createNamespace(ns);
+ admin2.namespaces().createNamespace(ns);
+
+ // Set topic auto-creation rule.
+ // c1: no-partitioned topic
+ // c2: partitioned topic with 2 partitions.
+ AutoTopicCreationOverride autoTopicCreation =
+
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+
.topicType("partitioned").defaultNumPartitions(2).build();
+ admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation);
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(),
2);
+ // Trigger system topic __change_event's initialize.
+
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://"
+ ns + "/1"));
+ });
+
+ // Create non-partitioned topic.
+ // Enable replication.
+ final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp_");
+ admin1.topics().createNonPartitionedTopic(tp);
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+ admin2.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1, cluster2)));
+ }
+
+ // Trigger and wait for replicator starts.
+ Producer<String> p1 =
client1.newProducer(Schema.STRING).topic(tp).create();
+ p1.send("msg-1");
+ p1.close();
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic)
broker1.getTopic(tp, false).join().get();
+ assertFalse(persistentTopic.getReplicators().isEmpty());
+ });
+
+ // Verify: the topics are the same between two clusters.
+ Predicate<String> topicNameFilter = t -> {
+ TopicName topicName = TopicName.get(t);
+ if (!topicName.getNamespace().equals(ns)) {
+ return false;
+ }
+ return t.startsWith(tp);
+ };
+ Awaitility.await().untilAsserted(() -> {
+ List<String> topics1 =
pulsar1.getBrokerService().getTopics().keys()
+
.stream().filter(topicNameFilter).collect(Collectors.toList());
+ List<String> topics2 =
pulsar2.getBrokerService().getTopics().keys()
+
.stream().filter(topicNameFilter).collect(Collectors.toList());
+ Collections.sort(topics1);
+ Collections.sort(topics2);
+ assertEquals(topics1, topics2);
+ });
+
+ // cleanup.
+ admin1.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster1)));
+ if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+ admin2.namespaces().setNamespaceReplicationClusters(ns, new
HashSet<>(Arrays.asList(cluster2)));
+ }
+ Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic = (PersistentTopic)
broker1.getTopic(tp, false).join().get();
+ assertTrue(persistentTopic.getReplicators().isEmpty());
+ if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+ assertTrue(persistentTopic.getReplicators().isEmpty());
+ }
+ });
+ admin1.topics().delete(tp, false);
+ admin2.topics().delete(tp, false);
+ admin1.namespaces().deleteNamespace(ns);
+ admin2.namespaces().deleteNamespace(ns);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 31e94f435f0..34810bbe905 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -161,4 +161,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});
}
+
+ @Test(enabled = false)
+ @Override
+ public void testDifferentTopicCreationRule(ReplicationMode
replicationMode) throws Exception {
+ super.testDifferentTopicCreationRule(replicationMode);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
new file mode 100644
index 00000000000..7b0edd314d0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testWhenNonPartitionedTopicExists() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+ ProducerBuilderImpl<String> producerBuilder =
+ (ProducerBuilderImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topic);
+ producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+ // Verify: create successfully.
+ Producer producer = producerBuilder.create();
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topic, false);
+ }
+
+ @Test
+ public void testWhenPartitionedTopicExists() throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createPartitionedTopic(topic, 2);
+ ProducerBuilderImpl<String> producerBuilder =
+ (ProducerBuilderImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topic);
+ producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+ // Verify: failed to create.
+ try {
+ producerBuilder.create();
+ Assert.fail("expected an error since producer expected a
non-partitioned topic");
+ } catch (Exception ex) {
+ // expected an error.
+ log.error("expected error", ex);
+ }
+ // cleanup.
+ admin.topics().deletePartitionedTopic(topic, false);
+ }
+
+ @DataProvider(name = "topicTypes")
+ public Object[][] topicTypes() {
+ return new Object[][]{
+ {TopicType.PARTITIONED},
+ {TopicType.NON_PARTITIONED}
+ };
+ }
+
+ @Test(dataProvider = "topicTypes")
+ public void testWhenTopicNotExists(TopicType topicType) throws Exception {
+ final String namespace = "public/default";
+ final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ final TopicName topicName = TopicName.get(topic);
+ AutoTopicCreationOverride.Builder policyBuilder =
AutoTopicCreationOverride.builder()
+ .topicType(topicType.toString()).allowAutoTopicCreation(true);
+ if (topicType.equals(TopicType.PARTITIONED)) {
+ policyBuilder.defaultNumPartitions(2);
+ }
+ AutoTopicCreationOverride policy = policyBuilder.build();
+ admin.namespaces().setAutoTopicCreation(namespace, policy);
+
+ ProducerBuilderImpl<String> producerBuilder =
+ (ProducerBuilderImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topic);
+ producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+ // Verify: create successfully.
+ Producer producer = producerBuilder.create();
+ // Verify: only create non-partitioned topic.
+
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExists(topicName));
+
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topic, false);
+ admin.namespaces().removeAutoTopicCreation(namespace);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 120bdeb569c..4585b532812 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -49,9 +49,11 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -382,26 +384,55 @@ public class PulsarClientImpl implements PulsarClient {
}
+ private CompletableFuture<Integer> checkPartitions(String topic, boolean
forceNoPartitioned,
+ @Nullable String
producerNameForLog) {
+ CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();
+ getPartitionedTopicMetadata(topic,
!forceNoPartitioned).thenAccept(metadata -> {
+ if (forceNoPartitioned && metadata.partitions > 0) {
+ String errorMsg = String.format("Can not create the
producer[%s] for the topic[%s] that contains %s"
+ + " partitions, but the producer does not
support for a partitioned topic.",
+ producerNameForLog, topic, metadata.partitions);
+ log.error(errorMsg);
+ checkPartitions.completeExceptionally(
+ new
PulsarClientException.NotConnectedException(errorMsg));
+ } else {
+ checkPartitions.complete(metadata.partitions);
+ }
+ }).exceptionally(ex -> {
+ Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+ if (forceNoPartitioned && actEx instanceof
PulsarClientException.NotFoundException
+ || actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof
PulsarAdminException.NotFoundException) {
+ checkPartitions.complete(0);
+ } else {
+ checkPartitions.completeExceptionally(ex);
+ }
+ return null;
+ });
+ return checkPartitions;
+ }
+
private <T> CompletableFuture<Producer<T>> createProducerAsync(String
topic,
ProducerConfigurationData conf,
Schema<T>
schema,
ProducerInterceptors interceptors) {
CompletableFuture<Producer<T>> producerCreatedFuture = new
CompletableFuture<>();
- getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+
+
+ checkPartitions(topic, conf.isNonPartitionedTopicExpected(),
conf.getProducerName()).thenAccept(partitions -> {
if (log.isDebugEnabled()) {
- log.debug("[{}] Received topic metadata. partitions: {}",
topic, metadata.partitions);
+ log.debug("[{}] Received topic metadata. partitions: {}",
topic, partitions);
}
ProducerBase<T> producer;
- if (metadata.partitions > 0) {
+ if (partitions > 0) {
producer = newPartitionedProducerImpl(topic, conf, schema,
interceptors, producerCreatedFuture,
- metadata);
+ partitions);
} else {
producer = newProducerImpl(topic, -1, conf, schema,
interceptors, producerCreatedFuture,
Optional.empty());
}
-
producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}",
topic, ex.getMessage());
@@ -422,7 +453,6 @@ public class PulsarClientImpl implements PulsarClient {
* @param schema topic schema
* @param interceptors producer interceptors
* @param producerCreatedFuture future for signaling completion of async
producer creation
- * @param metadata partitioned topic metadata
* @param <T> message type class
* @return new PartitionedProducerImpl instance
*/
@@ -432,8 +462,8 @@ public class PulsarClientImpl implements PulsarClient {
ProducerInterceptors interceptors,
CompletableFuture<Producer<T>>
producerCreatedFuture,
-
PartitionedTopicMetadata metadata) {
- return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic,
conf, metadata.partitions,
+ int
partitions) {
+ return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic,
conf, partitions,
producerCreatedFuture, schema, interceptors);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 581b3d8a163..6ec738bbf4c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -204,6 +204,8 @@ public class ProducerConfigurationData implements
Serializable, Cloneable {
private SortedMap<String, String> properties = new TreeMap<>();
+ private boolean isNonPartitionedTopicExpected;
+
@ApiModelProperty(
name = "initialSubscriptionName",
value = "Use this configuration to automatically create an initial
subscription when creating a topic."