This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a8ce990a72c [fix][broker] Replication stuck when partitions count 
between two clusters is not the same (#22983)
a8ce990a72c is described below

commit a8ce990a72c3024fafe689f3bc3c5127583021e6
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 15 23:01:47 2024 +0800

    [fix][broker] Replication stuck when partitions count between two clusters 
is not the same (#22983)
---
 .../pulsar/broker/service/AbstractReplicator.java  |   5 +
 .../service/persistent/PersistentReplicator.java   |   6 ++
 .../broker/service/AbstractReplicatorTest.java     |   7 +-
 .../broker/service/OneWayReplicatorTest.java       |  91 ++++++++++++++++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |   6 ++
 .../api/NonPartitionedTopicExpectedTest.java       | 118 +++++++++++++++++++++
 .../pulsar/client/impl/PulsarClientImpl.java       |  46 ++++++--
 .../impl/conf/ProducerConfigurationData.java       |   2 +
 8 files changed, 271 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 8552a9f09e9..424263720f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -184,6 +185,10 @@ public abstract class AbstractReplicator implements 
Replicator {
         }
 
         log.info("[{}] Starting replicator", replicatorId);
+        // Force only replicate messages to a non-partitioned topic, to avoid 
auto-create a partitioned topic on
+        // the remote cluster.
+        ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) 
producerBuilder;
+        builderImpl.getConf().setNonPartitionedTopicExpected(true);
         producerBuilder.createAsync().thenAccept(producer -> {
             setProducerAndTriggerReadEntries(producer);
         }).exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 54b8993784e..33e883ab940 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -154,6 +154,12 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         Pair<Boolean, State> changeStateRes;
         changeStateRes = compareSetAndGetState(Starting, Started);
         if (changeStateRes.getLeft()) {
+            if (!(producer instanceof ProducerImpl)) {
+                log.error("[{}] The partitions count between two clusters is 
not the same, the replicator can not be"
+                        + " created successfully: {}", replicatorId, state);
+                doCloseProducerAsync(producer, () -> {});
+                throw new ClassCastException(producer.getClass().getName() + " 
can not be cast to ProducerImpl");
+            }
             this.producer = (ProducerImpl) producer;
             HAVE_PENDING_READ_UPDATER.set(this, FALSE);
             // Trigger a new read.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 64d3088b206..7415a40ad55 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -39,10 +39,11 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -71,7 +72,8 @@ public class AbstractReplicatorTest {
         when(localClient.getCnxPool()).thenReturn(connectionPool);
         final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
         when(remoteClient.getCnxPool()).thenReturn(connectionPool);
-        final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        final ProducerConfigurationData producerConf = new 
ProducerConfigurationData();
+        final ProducerBuilderImpl producerBuilder = 
mock(ProducerBuilderImpl.class);
         final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
         when(broker.executor()).thenReturn(eventLoopGroup);
         when(broker.getTopics()).thenReturn(topics);
@@ -87,6 +89,7 @@ public class AbstractReplicatorTest {
         when(producerBuilder.sendTimeout(anyInt(), 
any())).thenReturn(producerBuilder);
         
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
         
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        when(producerBuilder.getConf()).thenReturn(producerConf);
         // Mock create producer fail.
         when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));
         when(producerBuilder.createAsync())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 9aad26530df..1745d4dc90f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -50,7 +51,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.SneakyThrows;
@@ -79,9 +82,11 @@ import 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import 
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -1069,4 +1074,90 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin1.topics().delete(topic3, false);
         admin2.topics().delete(topic3, false);
     }
+
+    @DataProvider(name = "replicationModes")
+    public Object[][] replicationModes() {
+        return new Object[][]{
+            {ReplicationMode.OneWay},
+            {ReplicationMode.DoubleWay}
+        };
+    }
+
+    protected enum ReplicationMode {
+        OneWay,
+        DoubleWay;
+    }
+
+    @Test(dataProvider = "replicationModes")
+    public void testDifferentTopicCreationRule(ReplicationMode 
replicationMode) throws Exception {
+        String ns = defaultTenant + "/" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns);
+        admin2.namespaces().createNamespace(ns);
+
+        // Set topic auto-creation rule.
+        // c1: no-partitioned topic
+        // c2: partitioned topic with 2 partitions.
+        AutoTopicCreationOverride autoTopicCreation =
+                
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                        
.topicType("partitioned").defaultNumPartitions(2).build();
+        admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(),
 2);
+            // Trigger system topic __change_event's initialize.
+            
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://"
 + ns + "/1"));
+        });
+
+        // Create non-partitioned topic.
+        // Enable replication.
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        admin1.topics().createNonPartitionedTopic(tp);
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+            admin2.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        }
+
+        // Trigger and wait for replicator starts.
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+        p1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        // Verify: the topics are the same between two clusters.
+        Predicate<String> topicNameFilter = t -> {
+            TopicName topicName = TopicName.get(t);
+            if (!topicName.getNamespace().equals(ns)) {
+                return false;
+            }
+            return t.startsWith(tp);
+        };
+        Awaitility.await().untilAsserted(() -> {
+            List<String> topics1 = 
pulsar1.getBrokerService().getTopics().keys()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            List<String> topics2 = 
pulsar2.getBrokerService().getTopics().keys()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            Collections.sort(topics1);
+            Collections.sort(topics2);
+            assertEquals(topics1, topics2);
+        });
+
+        // cleanup.
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1)));
+        if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+            admin2.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster2)));
+        }
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertTrue(persistentTopic.getReplicators().isEmpty());
+            if (replicationMode.equals(ReplicationMode.DoubleWay)) {
+                assertTrue(persistentTopic.getReplicators().isEmpty());
+            }
+        });
+        admin1.topics().delete(tp, false);
+        admin2.topics().delete(tp, false);
+        admin1.namespaces().deleteNamespace(ns);
+        admin2.namespaces().deleteNamespace(ns);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index 31e94f435f0..34810bbe905 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -161,4 +161,10 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
             
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
         });
     }
+
+    @Test(enabled = false)
+    @Override
+    public void testDifferentTopicCreationRule(ReplicationMode 
replicationMode) throws Exception {
+        super.testDifferentTopicCreationRule(replicationMode);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
new file mode 100644
index 00000000000..7b0edd314d0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TopicType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testWhenNonPartitionedTopicExists() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+        ProducerBuilderImpl<String> producerBuilder =
+                (ProducerBuilderImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topic);
+        producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+        // Verify: create successfully.
+        Producer producer = producerBuilder.create();
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
+
+    @Test
+    public void testWhenPartitionedTopicExists() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createPartitionedTopic(topic, 2);
+        ProducerBuilderImpl<String> producerBuilder =
+                (ProducerBuilderImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topic);
+        producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+        // Verify: failed to create.
+        try {
+            producerBuilder.create();
+            Assert.fail("expected an error since producer expected a 
non-partitioned topic");
+        } catch (Exception ex) {
+            // expected an error.
+            log.error("expected error", ex);
+        }
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topic, false);
+    }
+
+    @DataProvider(name = "topicTypes")
+    public Object[][] topicTypes() {
+        return new Object[][]{
+            {TopicType.PARTITIONED},
+            {TopicType.NON_PARTITIONED}
+        };
+    }
+
+    @Test(dataProvider = "topicTypes")
+    public void testWhenTopicNotExists(TopicType topicType) throws Exception {
+        final String namespace = "public/default";
+        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final TopicName topicName = TopicName.get(topic);
+        AutoTopicCreationOverride.Builder policyBuilder = 
AutoTopicCreationOverride.builder()
+                .topicType(topicType.toString()).allowAutoTopicCreation(true);
+        if (topicType.equals(TopicType.PARTITIONED)) {
+            policyBuilder.defaultNumPartitions(2);
+        }
+        AutoTopicCreationOverride policy = policyBuilder.build();
+        admin.namespaces().setAutoTopicCreation(namespace, policy);
+
+        ProducerBuilderImpl<String> producerBuilder =
+                (ProducerBuilderImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topic);
+        producerBuilder.getConf().setNonPartitionedTopicExpected(true);
+        // Verify: create successfully.
+        Producer producer = producerBuilder.create();
+        // Verify: only create non-partitioned topic.
+        
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .partitionedTopicExists(topicName));
+        
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+        admin.namespaces().removeAutoTopicCreation(namespace);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 120bdeb569c..4585b532812 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -49,9 +49,11 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Getter;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -382,26 +384,55 @@ public class PulsarClientImpl implements PulsarClient {
 
     }
 
+    private CompletableFuture<Integer> checkPartitions(String topic, boolean 
forceNoPartitioned,
+                                                       @Nullable String 
producerNameForLog) {
+        CompletableFuture<Integer> checkPartitions = new CompletableFuture<>();
+        getPartitionedTopicMetadata(topic, 
!forceNoPartitioned).thenAccept(metadata -> {
+            if (forceNoPartitioned && metadata.partitions > 0) {
+                String errorMsg = String.format("Can not create the 
producer[%s] for the topic[%s] that contains %s"
+                                + " partitions, but the producer does not 
support for a partitioned topic.",
+                        producerNameForLog, topic, metadata.partitions);
+                log.error(errorMsg);
+                checkPartitions.completeExceptionally(
+                        new 
PulsarClientException.NotConnectedException(errorMsg));
+            } else {
+                checkPartitions.complete(metadata.partitions);
+            }
+        }).exceptionally(ex -> {
+            Throwable actEx = FutureUtil.unwrapCompletionException(ex);
+            if (forceNoPartitioned && actEx instanceof 
PulsarClientException.NotFoundException
+                    || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                    || actEx instanceof 
PulsarAdminException.NotFoundException) {
+                checkPartitions.complete(0);
+            } else {
+                checkPartitions.completeExceptionally(ex);
+            }
+            return null;
+        });
+        return checkPartitions;
+    }
+
     private <T> CompletableFuture<Producer<T>> createProducerAsync(String 
topic,
                                                                    
ProducerConfigurationData conf,
                                                                    Schema<T> 
schema,
                                                                    
ProducerInterceptors interceptors) {
         CompletableFuture<Producer<T>> producerCreatedFuture = new 
CompletableFuture<>();
 
-        getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> {
+
+
+        checkPartitions(topic, conf.isNonPartitionedTopicExpected(), 
conf.getProducerName()).thenAccept(partitions -> {
             if (log.isDebugEnabled()) {
-                log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
+                log.debug("[{}] Received topic metadata. partitions: {}", 
topic, partitions);
             }
 
             ProducerBase<T> producer;
-            if (metadata.partitions > 0) {
+            if (partitions > 0) {
                 producer = newPartitionedProducerImpl(topic, conf, schema, 
interceptors, producerCreatedFuture,
-                        metadata);
+                        partitions);
             } else {
                 producer = newProducerImpl(topic, -1, conf, schema, 
interceptors, producerCreatedFuture,
                         Optional.empty());
             }
-
             producers.add(producer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topic, ex.getMessage());
@@ -422,7 +453,6 @@ public class PulsarClientImpl implements PulsarClient {
      * @param schema topic schema
      * @param interceptors producer interceptors
      * @param producerCreatedFuture future for signaling completion of async 
producer creation
-     * @param metadata partitioned topic metadata
      * @param <T> message type class
      * @return new PartitionedProducerImpl instance
      */
@@ -432,8 +462,8 @@ public class PulsarClientImpl implements PulsarClient {
                                                                         
ProducerInterceptors interceptors,
                                                                         
CompletableFuture<Producer<T>>
                                                                                
 producerCreatedFuture,
-                                                                        
PartitionedTopicMetadata metadata) {
-        return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, 
conf, metadata.partitions,
+                                                                        int 
partitions) {
+        return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, 
conf, partitions,
                 producerCreatedFuture, schema, interceptors);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 581b3d8a163..6ec738bbf4c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -204,6 +204,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
 
     private SortedMap<String, String> properties = new TreeMap<>();
 
+    private boolean isNonPartitionedTopicExpected;
+
     @ApiModelProperty(
             name = "initialSubscriptionName",
             value = "Use this configuration to automatically create an initial 
subscription when creating a topic."

Reply via email to