This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9553c33481952c7354f1ab44bcfd3592aa0c7455
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 23 08:31:49 2024 +0800

    [improve] [broker] Phase 1 of PIP-370 support disable create topics on 
remote cluster through replication  (#23169)
    
    (cherry picked from commit 44f986014e4d314a4a52484856c7dfb2d89ea3c1)
---
 conf/broker.conf                                   |  10 +
 conf/standalone.conf                               |  10 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   5 +
 .../apache/pulsar/broker/admin/AdminResource.java  |  10 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |   6 +
 .../pulsar/broker/service/AbstractReplicator.java  |  16 +-
 .../persistent/GeoPersistentReplicator.java        |  29 +++
 ...eateTopicToRemoteClusterForReplicationTest.java | 208 +++++++++++++++++++++
 .../pulsar/broker/service/StandaloneTest.java      |   1 +
 .../common/naming/ServiceConfigurationTest.java    |   5 +
 .../configurations/pulsar_broker_test.conf         |   1 +
 .../pulsar_broker_test_standalone.conf             |   3 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   2 +-
 13 files changed, 297 insertions(+), 9 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index bc026fea9c9..3689721e267 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1534,6 +1534,16 @@ replicatorPrefix=pulsar.repl
 # due to missing ZooKeeper watch (disable with value 0)
 replicationPolicyCheckDurationSeconds=600
 
+# Whether the internal replication of the local cluster will trigger topic 
auto-creation on the remote cluster.
+# 1. After enabling namespace-level Geo-Replication: whether the local broker 
will create topics on the remote
+#   cluster automatically when calling `pulsar-admin topics 
create-partitioned-topic`.
+# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether 
the local broker will create topics on
+#   the remote cluster.
+# 3. Whether the internal Geo-Replicator in the local cluster will trigger 
non-persistent topic auto-creation for
+#   remote clusters.
+# It is not a dynamic config, the default value is "true" to preserve 
backward-compatible behavior.
+createTopicToRemoteClusterForReplication=true
+
 # Default message retention time.
 # 0 means retention is disabled. -1 means data is not removed by time quota.
 defaultRetentionTimeInMinutes=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ab0f2cebf66..da8c9759960 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -946,6 +946,16 @@ replicationProducerQueueSize=1000
 # due to missing ZooKeeper watch (disable with value 0)
 replicationPolicyCheckDurationSeconds=600
 
+# Whether the internal replication of the local cluster will trigger topic 
auto-creation on the remote cluster.
+# 1. After enabling namespace-level Geo-Replication: whether the local broker 
will create topics on the remote
+#   cluster automatically when calling `pulsar-admin topics 
create-partitioned-topic`.
+# 2. When enabling topic-level Geo-Replication on a partitioned topic: whether 
the local broker will create topics on
+#   the remote cluster.
+# 3. Whether the internal Geo-Replicator in the local cluster will trigger 
non-persistent topic auto-creation for
+#   remote clusters.
+# It is not a dynamic config, the default value is "true" to preserve 
backward-compatible behavior.
+createTopicToRemoteClusterForReplication=true
+
 # Default message retention time. 0 means retention is disabled. -1 means data 
is not removed by time quota
 defaultRetentionTimeInMinutes=0
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a992170f130..ff401fdb2b7 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2746,6 +2746,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
                     + "inconsistency due to missing ZooKeeper watch (disable 
with value 0)"
         )
     private int replicationPolicyCheckDurationSeconds = 600;
+    @FieldContext(
+            category = CATEGORY_REPLICATION,
+            doc = "Whether the internal replicator will trigger topic 
auto-creation on the remote cluster."
+        )
+    private boolean createTopicToRemoteClusterForReplication = true;
     @Deprecated
     @FieldContext(
         category = CATEGORY_REPLICATION,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 7121a627d83..0521054a28b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -601,11 +601,15 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 .thenCompose(__ -> 
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
                 .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
                 .thenRun(() -> {
-                    if (!createLocalTopicOnly && topicName.isGlobal()) {
+                    if (!createLocalTopicOnly && topicName.isGlobal()
+                            && 
pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
                         
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
+                        log.info("[{}] Successfully created partitioned for 
topic {} for the remote clusters",
+                                clientAppId());
+                    } else {
+                        log.info("[{}] Skip creating partitioned for topic {} 
for the remote clusters",
+                                clientAppId(), topicName);
                     }
-                    log.info("[{}] Successfully created partitions for topic 
{} in cluster {}",
-                            clientAppId(), topicName, 
pulsar().getConfiguration().getClusterName());
                     asyncResponse.resume(Response.noContent().build());
                 })
                 .exceptionally(ex -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1259ca690ec..abd9a37aaf7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3513,6 +3513,12 @@ public class PersistentTopicsBase extends AdminResource {
                     }
                     return FutureUtil.waitForAll(futures);
                 }).thenCompose(__ -> {
+                    if 
(!pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
+                        log.info("[{}] Skip creating partitioned for topic {} 
for the remote clusters {}",
+                                clientAppId(), topicName, 
replicationClusters.stream().filter(v ->
+                                        
!pulsar().getConfig().getClusterName().equals(v)).collect(Collectors.toList()));
+                        return CompletableFuture.completedFuture(null);
+                    }
                     // Sync to create partitioned topic on the remote cluster 
if needed.
                     TopicName topicNameWithoutPartition = 
TopicName.get(topicName.getPartitionedTopicName());
                     return 
pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 077717aff9c..86d19817da7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -140,6 +140,10 @@ public abstract class AbstractReplicator implements 
Replicator {
         return remoteCluster;
     }
 
+    protected CompletableFuture<Void> prepareCreateProducer() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     public void startProducer() {
         // Guarantee only one task call "producerBuilder.createAsync()".
         Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
@@ -166,12 +170,15 @@ public abstract class AbstractReplicator implements 
Replicator {
         }
 
         log.info("[{}] Starting replicator", replicatorId);
+
         // Force only replicate messages to a non-partitioned topic, to avoid 
auto-create a partitioned topic on
         // the remote cluster.
-        ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) 
producerBuilder;
-        builderImpl.getConf().setNonPartitionedTopicExpected(true);
-        producerBuilder.createAsync().thenAccept(producer -> {
-            setProducerAndTriggerReadEntries(producer);
+        prepareCreateProducer().thenCompose(ignore -> {
+            ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) 
producerBuilder;
+            builderImpl.getConf().setNonPartitionedTopicExpected(true);
+            return producerBuilder.createAsync().thenAccept(producer -> {
+                setProducerAndTriggerReadEntries(producer);
+            });
         }).exceptionally(ex -> {
             Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
             if (setDisconnectedRes.getLeft()) {
@@ -196,6 +203,7 @@ public abstract class AbstractReplicator implements 
Replicator {
             }
             return null;
         });
+
     }
 
     /***
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 082dfed10c6..3390c3a2885 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -27,11 +27,13 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class GeoPersistentReplicator extends PersistentReplicator {
@@ -51,6 +53,33 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
         return getReplicatorName(replicatorPrefix, localCluster) + 
REPL_PRODUCER_NAME_DELIMITER + remoteCluster;
     }
 
+    @Override
+    protected CompletableFuture<Void> prepareCreateProducer() {
+        if 
(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
 {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> topicCheckFuture = new 
CompletableFuture<>();
+            
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, 
false)
+                    .whenComplete((metadata, ex) -> {
+                if (ex == null) {
+                    if (metadata.partitions == 0) {
+                        topicCheckFuture.complete(null);
+                    } else {
+                        String errorMsg = String.format("{} Can not create the 
replicator due to the partitions in the"
+                                        + " remote cluster is not 0, but is 
%s",
+                                replicatorId, metadata.partitions);
+                        log.error(errorMsg);
+                        topicCheckFuture.completeExceptionally(
+                                new 
PulsarClientException.NotAllowedException(errorMsg));
+                    }
+                } else {
+                    
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
+                }
+            });
+            return topicCheckFuture;
+        }
+    }
+
     @Override
     protected boolean replicateEntries(List<Entry> entries) {
         boolean atLeastOneMessageSentForReplication = false;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
new file mode 100644
index 00000000000..0f8db4aaa73
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DisabledCreateTopicToRemoteClusterForReplicationTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class DisabledCreateTopicToRemoteClusterForReplicationTest extends 
OneWayReplicatorTestBase {
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+        admin1.namespaces().setRetention(replicatedNamespace, new 
RetentionPolicies(300, 1024));
+        admin2.namespaces().setRetention(replicatedNamespace, new 
RetentionPolicies(300, 1024));
+        admin1.namespaces().setRetention(nonReplicatedNamespace, new 
RetentionPolicies(300, 1024));
+        admin2.namespaces().setRetention(nonReplicatedNamespace, new 
RetentionPolicies(300, 1024));
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @Override
+    protected void setConfigDefaults(ServiceConfiguration config, String 
clusterName,
+                                     LocalBookkeeperEnsemble 
bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
+        super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, 
brokerConfigZk);
+        config.setCreateTopicToRemoteClusterForReplication(false);
+        config.setReplicationStartAt("earliest");
+    }
+
+    @Test
+    public void testCreatePartitionedTopicWithNsReplication() throws Exception 
{
+        String ns = defaultTenant + "/" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns);
+        admin2.namespaces().createNamespace(ns);
+        admin1.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));
+        admin2.namespaces().setRetention(ns, new RetentionPolicies(3600, -1));
+
+        // Create non-partitioned topic.
+        // Enable replication.
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        final String part1 = TopicName.get(tp).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(tp, 1);
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+        // Trigger and wait for replicator starts.
+        String msgValue = "msg-1";
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        producer1.send(msgValue);
+        producer1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(part1, false).join().get();
+            assertFalse(topicPart1.getReplicators().isEmpty());
+        });
+
+        // Verify: there is no topic with the same name on the remote cluster.
+        try {
+            admin2.topics().getPartitionedTopicMetadata(tp);
+            fail("Expected a not found ex");
+        } catch (PulsarAdminException.NotFoundException ex) {
+            // expected.
+        }
+
+        // Verify: after creating the topic on the remote cluster, all things 
are fine.
+        admin2.topics().createPartitionedTopic(tp, 1);
+        Consumer<String> consumer2 = 
client2.newConsumer(Schema.STRING).topic(tp).isAckReceiptEnabled(true)
+                .subscriptionName("s1").subscribe();
+        assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), 
msgValue);
+        consumer2.close();
+
+        // cleanup.
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1)));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(part1, false).join().get();
+            assertTrue(topicPart1.getReplicators().isEmpty());
+        });
+        admin1.topics().deletePartitionedTopic(tp, false);
+        admin2.topics().deletePartitionedTopic(tp, false);
+        admin1.namespaces().deleteNamespace(ns);
+        admin2.namespaces().deleteNamespace(ns);
+    }
+
+    @Test
+    public void testEnableTopicReplication() throws Exception {
+        String ns = nonReplicatedNamespace;
+
+        // Create non-partitioned topic.
+        // Enable replication.
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        final String part1 = TopicName.get(tp).getPartition(0).toString();
+        admin1.topics().createPartitionedTopic(tp, 1);
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, 
cluster2));
+
+        // Trigger and wait for replicator starts.
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+        p1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(part1, false).join().get();
+            assertFalse(topicPart1.getReplicators().isEmpty());
+        });
+
+        // Verify: there is no topic with the same name on the remote cluster.
+        try {
+            admin2.topics().getPartitionedTopicMetadata(tp);
+            fail("Expected a not found ex");
+        } catch (PulsarAdminException.NotFoundException ex) {
+            // expected.
+        }
+
+        // Verify: after creating the topic on the remote cluster, all things 
are fine.
+        admin2.topics().createPartitionedTopic(tp, 1);
+        waitReplicatorStarted(part1);
+
+        // cleanup.
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(part1, false).join().get();
+            assertTrue(topicPart1.getReplicators().isEmpty());
+        });
+        admin1.topics().deletePartitionedTopic(tp, false);
+        admin2.topics().deletePartitionedTopic(tp, false);
+    }
+
+    @Test
+    public void testNonPartitionedTopic() throws Exception {
+        String ns = nonReplicatedNamespace;
+
+        // Create non-partitioned topic.
+        // Enable replication.
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        admin1.topics().createNonPartitionedTopic(tp);
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1, 
cluster2));
+
+        // Trigger and wait for replicator starts.
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+        p1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertFalse(topicPart1.getReplicators().isEmpty());
+        });
+
+        // Verify: there is no topic with the same name on the remote cluster.
+        try {
+            admin2.topics().getPartitionedTopicMetadata(tp);
+            fail("Expected a not found ex");
+        } catch (PulsarAdminException.NotFoundException ex) {
+            // expected.
+        }
+
+        // Verify: after creating the topic on the remote cluster, all things 
are fine.
+        admin2.topics().createNonPartitionedTopic(tp);
+        waitReplicatorStarted(tp);
+
+        // cleanup.
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic topicPart1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertTrue(topicPart1.getReplicators().isEmpty());
+        });
+        admin1.topics().delete(tp, false);
+        admin2.topics().delete(tp, false);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index 67d188efd25..fecf103ddbe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -58,5 +58,6 @@ public class StandaloneTest extends 
MockedPulsarServiceBaseTest {
                 
"internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651");
         
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
         assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(), 
200);
+        
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(),
 true);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index ae13afb1934..ed108c7d162 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -75,6 +75,7 @@ public class ServiceConfigurationTest {
         assertEquals(config.getHttpMaxRequestHeaderSize(), 1234);
         assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
         assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
+        assertEquals(config.isCreateTopicToRemoteClusterForReplication(), 
false);
         OffloadPoliciesImpl offloadPolicies = 
OffloadPoliciesImpl.create(config.getProperties());
         
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
 "bookkeeper-first");
     }
@@ -291,6 +292,7 @@ public class ServiceConfigurationTest {
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 
512);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1024 
* 1024 * 4);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
 1);
+            
assertEquals(configuration.isCreateTopicToRemoteClusterForReplication(), true);
         }
         // pulsar_broker_test.conf.
         try (InputStream inputStream = 
this.getClass().getClassLoader().getResourceAsStream(fileName)) {
@@ -303,6 +305,7 @@ public class ServiceConfigurationTest {
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 
44);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 55);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
 66);
+            
assertEquals(configuration.isCreateTopicToRemoteClusterForReplication(), false);
         }
         // string input stream.
         StringBuilder stringBuilder = new StringBuilder();
@@ -314,6 +317,7 @@ public class ServiceConfigurationTest {
         
stringBuilder.append("transactionPendingAckBatchedWriteMaxRecords=521").append(System.lineSeparator());
         
stringBuilder.append("transactionPendingAckBatchedWriteMaxSize=1025").append(System.lineSeparator());
         
stringBuilder.append("transactionPendingAckBatchedWriteMaxDelayInMillis=20").append(System.lineSeparator());
+        
stringBuilder.append("createTopicToRemoteClusterForReplication=false").append(System.lineSeparator());
         try(ByteArrayInputStream inputStream =
                     new 
ByteArrayInputStream(stringBuilder.toString().getBytes(StandardCharsets.UTF_8))){
             configuration = PulsarConfigurationLoader.create(inputStream, 
ServiceConfiguration.class);
@@ -325,6 +329,7 @@ public class ServiceConfigurationTest {
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxRecords(), 
521);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxSize(), 1025);
             
assertEquals(configuration.getTransactionPendingAckBatchedWriteMaxDelayInMillis(),
 20);
+            
assertEquals(configuration.isCreateTopicToRemoteClusterForReplication(), false);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 551a9c88757..0d249693285 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -105,3 +105,4 @@ transactionPendingAckBatchedWriteMaxSize=55
 transactionPendingAckBatchedWriteMaxDelayInMillis=66
 topicNameCacheMaxCapacity=200
 maxSecondsToClearTopicNameCache=1
+createTopicToRemoteClusterForReplication=false
diff --git 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index e9aeed1a34d..062ead20a7c 100644
--- 
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++ 
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -95,4 +95,5 @@ supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
 defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
 maxMessagePublishBufferSizeInMB=-1
 topicNameCacheMaxCapacity=200
-maxSecondsToClearTopicNameCache=1
\ No newline at end of file
+maxSecondsToClearTopicNameCache=1
+createTopicToRemoteClusterForReplication=true
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f28b81e8e55..3bf1a57649b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -382,7 +382,7 @@ public class PulsarClientImpl implements PulsarClient {
         getPartitionedTopicMetadata(topic, !forceNoPartitioned, 
true).thenAccept(metadata -> {
             if (forceNoPartitioned && metadata.partitions > 0) {
                 String errorMsg = String.format("Can not create the 
producer[%s] for the topic[%s] that contains %s"
-                                + " partitions, but the producer does not 
support for a partitioned topic.",
+                                + " partitions b,ut the producer does not 
support for a partitioned topic.",
                         producerNameForLog, topic, metadata.partitions);
                 log.error(errorMsg);
                 checkPartitions.completeExceptionally(

Reply via email to