This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fca90d14fc4 [improve][broker] PIP-433: Ensure topic creation before 
starting GEO (#24652)
fca90d14fc4 is described below

commit fca90d14fc45dbe4b464be4ad66bac6703b8ded1
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jan 19 11:57:40 2026 +0800

    [improve][broker] PIP-433: Ensure topic creation before starting GEO 
(#24652)
    
    Co-authored-by: fengyubiao <[email protected]>
---
 .../pulsar/broker/service/AbstractReplicator.java  |   8 +-
 .../nonpersistent/NonPersistentReplicator.java     |   6 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  13 +-
 .../persistent/GeoPersistentReplicator.java        | 103 ++++++++---
 .../service/persistent/PersistentReplicator.java   |   8 +-
 .../broker/service/persistent/PersistentTopic.java |  23 +--
 .../service/persistent/ShadowReplicator.java       |   6 +-
 .../broker/service/AbstractReplicatorTest.java     |  21 ++-
 .../broker/service/OneWayReplicatorTest.java       | 193 +++++++++++++++++++++
 .../pulsar/broker/service/PersistentTopicTest.java |  47 ++++-
 10 files changed, 374 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index f996d328090..c7a36ad1b21 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -57,6 +58,8 @@ public abstract class AbstractReplicator implements 
Replicator {
     protected final String remoteCluster;
     protected final PulsarClientImpl replicationClient;
     protected final PulsarClientImpl client;
+    protected final PulsarAdmin replicationAdmin;
+    protected final PulsarAdmin admin;
     protected String replicatorId;
     @Getter
     protected final Topic localTopic;
@@ -107,7 +110,8 @@ public abstract class AbstractReplicator implements 
Replicator {
     }
 
     public AbstractReplicator(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
-                              String replicatorPrefix, BrokerService 
brokerService, PulsarClientImpl replicationClient)
+                              String replicatorPrefix, BrokerService 
brokerService, PulsarClientImpl replicationClient,
+                              PulsarAdmin replicationAdmin)
             throws PulsarServerException {
         this.brokerService = brokerService;
         this.localTopic = localTopic;
@@ -117,7 +121,9 @@ public abstract class AbstractReplicator implements 
Replicator {
         this.remoteTopicName = remoteTopicName;
         this.remoteCluster = StringInterner.intern(remoteCluster);
         this.replicationClient = replicationClient;
+        this.replicationAdmin = replicationAdmin;
         this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
+        this.admin = brokerService.pulsar().getAdminClient();
         this.producer = null;
         this.producerQueueSize = 
brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
         this.replicatorId = String.format("%s | %s",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 38e1894c178..38320e5be70 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,9 +52,10 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
     private final NonPersistentReplicatorStatsImpl stats = new 
NonPersistentReplicatorStatsImpl();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String 
localCluster, String remoteCluster,
-            BrokerService brokerService, PulsarClientImpl replicationClient) 
throws PulsarServerException {
+                                   BrokerService brokerService, 
PulsarClientImpl replicationClient,
+                                   PulsarAdmin replicationAdmin) throws 
PulsarServerException {
         super(localCluster, topic, remoteCluster, topic.getName(), 
topic.getReplicatorPrefix(), brokerService,
-                replicationClient);
+                replicationClient, replicationAdmin);
         // NonPersistentReplicator does not support limitation so far, so 
reset pending queue size to the default value.
         producerBuilder.maxPendingMessages(1000);
         producerBuilder.blockIfQueueFull(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 6021c41142a..96a9f97d70f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -71,7 +71,9 @@ import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
 import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -628,14 +630,15 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             String localCluster) {
         return 
AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), 
brokerService)
                 .thenCompose(__ -> 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getClusterAsync(remoteCluster)
-                        .thenApply(clusterData ->
-                                
brokerService.getReplicationClient(remoteCluster, clusterData)))
-                .thenAccept(replicationClient -> {
+                        .getClusterAsync(remoteCluster))
+                .thenAccept((clusterData) -> {
+                    PulsarClient replicationClient = 
brokerService.getReplicationClient(remoteCluster, clusterData);
+                    PulsarAdmin replicationAdmin = 
brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
                     replicators.computeIfAbsent(remoteCluster, r -> {
                         try {
                             return new 
NonPersistentReplicator(NonPersistentTopic.this, localCluster,
-                                    remoteCluster, brokerService, 
(PulsarClientImpl) replicationClient);
+                                    remoteCluster, brokerService, 
(PulsarClientImpl) replicationClient,
+                                    replicationAdmin);
                         } catch (PulsarServerException e) {
                             log.error("[{}] Replicator startup failed {}", 
topic, remoteCluster, e);
                         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
index 46f8a27d580..922a0e42c3d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java
@@ -27,10 +27,12 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -40,9 +42,10 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
 
     public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor 
cursor, String localCluster,
                                    String remoteCluster, BrokerService 
brokerService,
-                                   PulsarClientImpl replicationClient)
+                                   PulsarClientImpl replicationClient, 
PulsarAdmin replicationAdmin)
             throws PulsarServerException {
-        super(localCluster, topic, cursor, remoteCluster, topic.getName(), 
brokerService, replicationClient);
+        super(localCluster, topic, cursor, remoteCluster, topic.getName(), 
brokerService, replicationClient,
+                replicationAdmin);
     }
 
     /**
@@ -55,29 +58,85 @@ public class GeoPersistentReplicator extends 
PersistentReplicator {
 
     @Override
     protected CompletableFuture<Void> prepareCreateProducer() {
-        if 
(brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
 {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            CompletableFuture<Void> topicCheckFuture = new 
CompletableFuture<>();
-            
replicationClient.getPartitionedTopicMetadata(localTopic.getName(), false, 
false)
-                    .whenComplete((metadata, ex) -> {
-                if (ex == null) {
-                    if (metadata.partitions == 0) {
-                        topicCheckFuture.complete(null);
-                    } else {
-                        String errorMsg = String.format("%s Can not create the 
replicator due to the partitions in the"
-                                        + " remote cluster is not 0, but is 
%s",
-                                replicatorId, metadata.partitions);
-                        log.error(errorMsg);
-                        topicCheckFuture.completeExceptionally(
-                                new 
PulsarClientException.NotAllowedException(errorMsg));
+        return 
createRemoteTopicIfDoesNotExist(TopicName.get(localTopicName).getPartitionedTopicName());
+    }
+
+    private CompletableFuture<Integer> getLocalPartitionMetadata(String topic) 
{
+        return admin.topics().getPartitionedTopicMetadataAsync(topic)
+                .thenApply(metadata -> metadata.partitions)
+                .exceptionallyCompose(t -> {
+                    Throwable actEx = FutureUtil.unwrapCompletionException(t);
+                    if (actEx instanceof 
PulsarAdminException.NotFoundException) {
+                        // Legacy edge case: Local topic is non-partitioned 
but name ends with "-partition-{num}".
+                        // This should never happen in practice because 
PIP-414 disables this naming pattern.
+                        return createRemoteTopicIfDoesNotExist(localTopicName)
+                                .thenApply(__ -> -1); // Special marker
                     }
-                } else {
-                    
topicCheckFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
+                    return CompletableFuture.failedFuture(actEx);
+                });
+    }
+
+    private CompletableFuture<Integer> getRemotePartitionMetadata(String 
topic) {
+        return 
replicationAdmin.topics().getPartitionedTopicMetadataAsync(topic)
+                .thenApply(metadata -> metadata.partitions)
+                .exceptionallyCompose(t -> {
+                    Throwable actEx = FutureUtil.unwrapCompletionException(t);
+                    if (actEx instanceof 
PulsarAdminException.NotFoundException) {
+                        return CompletableFuture.completedFuture(-1); // Topic 
doesn't exist
+                    }
+                    return CompletableFuture.failedFuture(actEx);
+                });
+    }
+
+    private CompletableFuture<Void> handlePartitionComparison(String topic, 
int localPartitions, int remotePartitions) {
+        // Skip if already handled by recursion
+        if (localPartitions == -1) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Remote topic doesn't exist - create it
+        if (remotePartitions == -1) {
+            if 
(!brokerService.getPulsar().getConfig().isCreateTopicToRemoteClusterForReplication())
 {
+                String errorMsg = String.format("[%s] Can not start replicator 
because there is no topic on"
+                                + " the remote cluster. Please create a %s on 
the remote cluster",
+                        replicatorId, localPartitions == 0 ? "non-partitioned 
topic"
+                                : "partitioned topic with " + localPartitions 
+ " partitions");
+                log.error(errorMsg);
+                return CompletableFuture.failedFuture(new 
PulsarServerException(errorMsg));
+            }
+
+            CompletableFuture<Void> createFuture = localPartitions == 0
+                    ? 
replicationAdmin.topics().createNonPartitionedTopicAsync(topic)
+                    : 
replicationAdmin.topics().createPartitionedTopicAsync(topic, localPartitions);
+
+            return createFuture.whenComplete((__, t) -> {
+                if (t != null) {
+                    log.error("[{}] Failed to create topic on remote cluster. 
Local has {} partitions",
+                            replicatorId, localPartitions, t);
                 }
             });
-            return topicCheckFuture;
         }
+
+        // Both exist - verify compatibility
+        if (localPartitions == remotePartitions) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Incompatible partitions
+        String errorMsg = String.format("[%s] Can not start replicator because 
the partitions between"
+                        + " local and remote cluster are different. local: %s, 
remote: %s",
+                replicatorId, localPartitions, remotePartitions);
+        log.error(errorMsg);
+        return CompletableFuture.failedFuture(new 
PulsarServerException(errorMsg));
+    }
+
+    private CompletableFuture<Void> createRemoteTopicIfDoesNotExist(String 
partitionedTopic) {
+        return getLocalPartitionMetadata(partitionedTopic)
+                .thenCompose(localPartitions ->
+                        
getRemotePartitionMetadata(partitionedTopic).thenCompose(remotePartitions ->
+                                handlePartitionComparison(partitionedTopic, 
localPartitions, remotePartitions)
+                        )
+                );
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index e0a31476fc9..c1d73cd3891 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.broker.service.Replicator;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -119,11 +120,12 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     protected final LinkedList<InFlightTask> inFlightTasks = new 
LinkedList<>();
 
     public PersistentReplicator(String localCluster, PersistentTopic 
localTopic, ManagedCursor cursor,
-                                   String remoteCluster, String remoteTopic,
-                                   BrokerService brokerService, 
PulsarClientImpl replicationClient)
+                                String remoteCluster, String remoteTopic,
+                                BrokerService brokerService, PulsarClientImpl 
replicationClient,
+                                PulsarAdmin replicationAdmin)
             throws PulsarServerException {
         super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
-                brokerService, replicationClient);
+                brokerService, replicationClient, replicationAdmin);
         this.topic = localTopic;
         this.localSchemaTopicName = 
TopicName.getPartitionedTopicName(localTopicName).toString();
         this.cursor = Objects.requireNonNull(cursor);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 451471e215e..5a1e0e940a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -148,6 +148,7 @@ import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -2343,11 +2344,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             String localCluster) {
         return 
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
 brokerService)
                 .thenCompose(__ -> 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getClusterAsync(remoteCluster)
-                        .thenApply(clusterData ->
-                                
brokerService.getReplicationClient(remoteCluster, clusterData)))
-                .thenAccept(replicationClient -> {
-                    if (replicationClient == null) {
+                        .getClusterAsync(remoteCluster))
+                .thenAccept((clusterData) -> {
+                    PulsarClient replicationClient = 
brokerService.getReplicationClient(remoteCluster, clusterData);
+                    PulsarAdmin replicationAdmin = 
brokerService.getClusterPulsarAdmin(remoteCluster, clusterData);
+                    if (replicationClient == null || replicationAdmin == null) 
{
                         log.error("[{}] Can not create replicator because the 
remote client can not be created."
                                         + " remote cluster: {}. State of 
transferring : {}",
                                 topic, remoteCluster, transferring);
@@ -2365,7 +2366,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         Replicator replicator = 
replicators.computeIfAbsent(remoteCluster, r -> {
                             try {
                                 return new 
GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
-                                        remoteCluster, brokerService, 
(PulsarClientImpl) replicationClient);
+                                        remoteCluster, brokerService, 
(PulsarClientImpl) replicationClient,
+                                        replicationAdmin);
                             } catch (PulsarServerException e) {
                                 log.error("[{}] Replicator startup failed {}", 
topic, remoteCluster, e);
                             }
@@ -2431,9 +2433,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
         return 
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
 brokerService)
                 .thenCompose(__ -> 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getClusterAsync(localCluster)
-                        .thenApply(clusterData -> 
brokerService.getReplicationClient(localCluster, clusterData)))
-                .thenAccept(replicationClient -> {
+                        .getClusterAsync(localCluster))
+                .thenAccept((clusterData) -> {
+                    PulsarClient replicationClient = 
brokerService.getReplicationClient(localCluster, clusterData);
+                    PulsarAdmin replicationAdmin = 
brokerService.getClusterPulsarAdmin(localCluster, clusterData);
                     Replicator replicator = 
shadowReplicators.computeIfAbsent(shadowTopic, r -> {
                         try {
                             TopicName sourceTopicName = 
TopicName.get(getName());
@@ -2442,7 +2445,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                 shadowPartitionTopic += "-partition-" + 
sourceTopicName.getPartitionIndex();
                             }
                             return new ShadowReplicator(shadowPartitionTopic, 
PersistentTopic.this, cursor,
-                                    brokerService, (PulsarClientImpl) 
replicationClient);
+                                    brokerService, (PulsarClientImpl) 
replicationClient, replicationAdmin);
                         } catch (PulsarServerException e) {
                             log.error("[{}] ShadowReplicator startup failed 
{}", topic, shadowTopic, e);
                         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
index a334fd86dd0..2e5e91fb9a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -39,11 +40,12 @@ import org.apache.pulsar.common.util.Codec;
 public class ShadowReplicator extends PersistentReplicator {
 
     public ShadowReplicator(String shadowTopic, PersistentTopic sourceTopic, 
ManagedCursor cursor,
-                            BrokerService brokerService, PulsarClientImpl 
replicationClient)
+                            BrokerService brokerService, PulsarClientImpl 
replicationClient,
+                            PulsarAdmin replicationAdmin)
             throws PulsarServerException {
         super(brokerService.pulsar().getConfiguration().getClusterName(), 
sourceTopic, cursor,
                 brokerService.pulsar().getConfiguration().getClusterName(), 
shadowTopic, brokerService,
-                replicationClient);
+                replicationClient, replicationAdmin);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
index 39252ac0a94..5f1d3a8a6c5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import io.netty.channel.DefaultEventLoop;
@@ -39,12 +40,15 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -94,9 +98,19 @@ public class AbstractReplicatorTest {
         when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));
         when(producerBuilder.createAsync())
                 .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("mocked ex")));
+
+        @Cleanup
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Topics adminTopics = mock(Topics.class);
+        doReturn(adminTopics).when(admin).topics();
+        doReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0))).when(adminTopics)
+                .getPartitionedTopicMetadataAsync(anyString());
+        doReturn(CompletableFuture.completedFuture(null)).when(adminTopics)
+                .createNonPartitionedTopicAsync(anyString());
+
         // Make race condition: "retry start producer" and "close replicator".
         final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, 
localTopic, remoteCluster, topicName,
-                replicatorPrefix, broker, remoteClient);
+                replicatorPrefix, broker, remoteClient, admin);
         replicator.startProducer();
         replicator.terminate();
 
@@ -122,9 +136,10 @@ public class AbstractReplicatorTest {
 
         public ReplicatorInTest(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
                                 String replicatorPrefix, BrokerService 
brokerService,
-                                PulsarClientImpl replicationClient) throws 
PulsarServerException {
+                                PulsarClientImpl replicationClient, 
PulsarAdmin replicationAdmin)
+                throws PulsarServerException {
             super(localCluster, localTopic, remoteCluster, remoteTopicName, 
replicatorPrefix, brokerService,
-                    replicationClient);
+                    replicationClient, replicationAdmin);
         }
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 8a80f57f5b2..8971e42f1d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker.ensureNoBacklogByInflightTask;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -63,6 +64,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
+import lombok.Cleanup;
 import lombok.Data;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -90,6 +92,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -108,6 +111,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
 import 
org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -1889,4 +1893,193 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         // Verify: all inflight tasks are done.
         ensureNoBacklogByInflightTask(getReplicator(topicName));
     }
+
+    @DataProvider
+    public Object[] isPartitioned() {
+        return new Object[]{
+                true,
+                false
+        };
+    }
+
+    @Test(dataProvider = "isPartitioned")
+    public void testReplicatorCreateTopic(boolean isPartitioned) throws 
Exception {
+        String ns = defaultTenant + "/" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns);
+        if (!usingGlobalZK){
+            admin2.namespaces().createNamespace(ns);
+        }
+
+        int numPartitions = 4;
+        List<String> partitions = new ArrayList<>();
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        if (isPartitioned) {
+            admin1.topics().createPartitionedTopic(tp, numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                partitions.add(TopicName.getTopicPartitionNameString(tp, i));
+            }
+        } else {
+            admin1.topics().createNonPartitionedTopic(tp);
+        }
+
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) broker1.getTopic(
+                                    isPartitioned ? 
TopicName.get(tp).getPartition(0).toString() : tp, false).join()
+                            .get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        @Cleanup
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+
+        Awaitility.await().untilAsserted(() -> {
+            List<String> partitionedTopicList = 
admin2.topics().getPartitionedTopicList(ns);
+            if (isPartitioned) {
+                assertThat(partitionedTopicList).contains(tp);
+                
assertThat(admin2.topics().getList(ns)).containsAll(partitions);
+            } else {
+                assertThat(partitionedTopicList).doesNotContain(tp);
+                assertThat(admin2.topics().getList(ns)).contains(tp);
+            }
+        });
+    }
+
+    @Test
+    public void 
testReplicatorCreateTopicWhenTopicExistsWithDifferentTypeAcrossClusters() 
throws Exception {
+        if (usingGlobalZK) {
+            // This test case is not applicable when using global ZK, because 
the namespace policies
+            // are shared among clusters.
+            return;
+        }
+
+        String ns = defaultTenant + "/" + 
UUID.randomUUID().toString().replace("-", "");
+        admin1.namespaces().createNamespace(ns);
+        admin2.namespaces().createNamespace(ns);
+
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        admin1.topics().createPartitionedTopic(tp, 4);
+        admin2.topics().createNonPartitionedTopic(tp);
+
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin2.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+                            .get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) 
broker2.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        @Cleanup
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-p1-1");
+        @Cleanup
+        Producer<String> p2 = 
client2.newProducer(Schema.STRING).topic(tp).create();
+        p2.send("msg-p2-1");
+
+        // The topic exists, but its type differs between the local and remote 
clusters. The replicator should not
+        // recreate the topic.
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+                            .get();
+            persistentTopic.getReplicators().forEach((key, value) -> {
+                assertFalse(value.isConnected());
+            });
+        });
+        
assertThat(admin2.topics().getPartitionedTopicList(ns)).doesNotContain(tp);
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic = (PersistentTopic) 
broker2.getTopic(tp, false).join().get();
+            persistentTopic.getReplicators().forEach((key, value) -> {
+                assertFalse(value.isConnected());
+            });
+        });
+        assertThat(admin1.topics().getList(ns)).doesNotContain(tp);
+    }
+
+    @Test
+    public void testReplicatorWhenPartitionCountsDiffer() throws Exception {
+        if (usingGlobalZK) {
+            // This test case is not applicable when using global ZK, because 
the namespace policies
+            // are shared among clusters.
+            return;
+        }
+
+        String ns = defaultTenant + "/" + 
UUID.randomUUID().toString().replace("-", "");
+
+        admin1.namespaces().createNamespace(ns);
+        admin1.namespaces().setAutoTopicCreation(ns, 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType(TopicType.PARTITIONED.toString())
+                .defaultNumPartitions(12)
+                .build());
+
+        admin2.namespaces().createNamespace(ns);
+        admin2.namespaces().setAutoTopicCreation(ns, 
AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType(TopicType.NON_PARTITIONED.toString())
+                .build());
+
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp_");
+        admin1.topics().createPartitionedTopic(tp, 4);
+        admin2.topics().createPartitionedTopic(tp, 8);
+
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+        admin2.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+                            .get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
broker2.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+                            .get();
+            assertFalse(persistentTopic.getReplicators().isEmpty());
+        });
+
+        // Trigger the replicator.
+        @Cleanup
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-p1-1");
+        @Cleanup
+        Producer<String> p2 = 
client2.newProducer(Schema.STRING).topic(tp).create();
+        p2.send("msg-p2-1");
+
+        // Topic partition counts differ between the local and remote clusters.
+        // The replicator should not replicate the messages.
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
broker1.getTopic(TopicName.get(tp).getPartition(0).toString(), false).join()
+                            .get();
+            persistentTopic.getReplicators().forEach((key, value) -> {
+                assertFalse(value.isConnected());
+            });
+        });
+
+        @Cleanup
+        Consumer<String> c2 = 
client2.newConsumer(Schema.STRING).topic(tp).subscriptionName("test-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+        while (true) {
+            Message<String> receive = c2.receive(3, TimeUnit.SECONDS);
+            if (receive == null) {
+                break;
+            }
+            assertEquals(receive.getValue(), "msg-p2-1");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 3b64b2ecc2c..c83f3750ff0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -111,11 +111,14 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PulsarCompactorSubscription;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -130,6 +133,8 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
@@ -233,6 +238,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         
doReturn(CompletableFuture.completedFuture(TopicExistsInfo.newTopicNotExists())).when(nsSvc)
                 .checkTopicExistsAsync(any());
 
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        LookupService lookupService = mock(LookupService.class);
+        doReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0))).when(lookupService)
+                .getPartitionedTopicMetadata(any(), anyBoolean(), 
anyBoolean());
+        doReturn(lookupService).when(pulsarClient).getLookup();
+        
doReturn(pulsarClient).when(pulsarTestContext.getPulsarService()).getClient();
+
         setupMLAsyncCallbackMocks();
     }
 
@@ -1653,6 +1665,16 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertNull(topic2.getSubscription(successSubName));
     }
 
+    private PulsarAdmin mockReplicationAdmin() {
+        PulsarAdmin admin = mock(PulsarAdmin.class);
+        Topics topics = mock(Topics.class);
+        doReturn(topics).when(admin).topics();
+        doReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata(0))).when(topics)
+                .getPartitionedTopicMetadataAsync(anyString());
+        
doReturn(CompletableFuture.completedFuture(null)).when(topics).createNonPartitionedTopicAsync(anyString());
+        return admin;
+    }
+
     /**
      * NonPersistentReplicator.removeReplicator doesn't remove replicator in 
atomic way and does in multiple step:
      * 1. disconnect replicator producer
@@ -1699,11 +1721,18 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                     return producerBuilder;
                 });
         brokerService.getReplicationClients().put(remoteCluster, 
pulsarClientMock);
+
+        @Cleanup
+        PulsarAdmin admin = mockReplicationAdmin();
+        PulsarService pulsar = brokerService.getPulsar();
+        doReturn(admin).when(pulsar).getAdminClient();
+        brokerService.getClusterAdmins().put(remoteCluster, admin);
+        Optional<ClusterData> clusterData = 
brokerService.pulsar().getPulsarResources().getClusterResources()
+                .getCluster(remoteCluster);
         PersistentReplicator replicator = spy(
                 new GeoPersistentReplicator(topic, cursor, localCluster, 
remoteCluster, brokerService,
-                        (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster,
-                                
brokerService.pulsar().getPulsarResources().getClusterResources()
-                                        .getCluster(remoteCluster))));
+                        (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster, clusterData),
+                        brokerService.getClusterPulsarAdmin(remoteCluster, 
clusterData)));
         replicatorMap.put(remoteReplicatorName, replicator);
 
         // step-1 remove replicator : it will disconnect the producer but it 
will wait for callback to be completed
@@ -1751,10 +1780,16 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
         brokerService.getReplicationClients().put(remoteCluster, client);
+
+        @Cleanup
+        PulsarAdmin admin = mockReplicationAdmin();
+        doReturn(admin).when(pulsar).getAdminClient();
+        brokerService.getClusterAdmins().put(remoteCluster, admin);
+        Optional<ClusterData> clusterData = 
brokerService.pulsar().getPulsarResources().getClusterResources()
+                .getCluster(remoteCluster);
         PersistentReplicator replicator = new GeoPersistentReplicator(topic, 
cursor, localCluster, remoteCluster,
-                brokerService, (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster,
-                
brokerService.pulsar().getPulsarResources().getClusterResources()
-                        .getCluster(remoteCluster)));
+                brokerService, (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster, clusterData),
+                brokerService.getClusterPulsarAdmin(remoteCluster, 
clusterData));
 
         // PersistentReplicator constructor calls startProducer()
         verify(clientImpl)


Reply via email to