This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c019c2a113a555c25fd65a6a450649ba25d8a1b4
Author: ran <[email protected]>
AuthorDate: Tue Jun 22 07:31:19 2021 +0800

    [Broker] Fix create partitioned topic in replicated namespace (#10963)
    
    Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
    
    ### Motivation
    
    Currently, create a partitioned topic in the replicated namespace will not 
create metadata path `/managed-ledgers` on replicated clusters.
    
    ### Modifications
    
    Add a new flag `createLocalTopicOnly` to indicate whether create the 
partitioned path in replicated clusters or not.
    If the flag is false, make remote calls to create partitioned topics on 
replicated clusters.
    
    
    (cherry picked from commit 2f8c175f52a1731f794c741f8fc3347b680191eb)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 120 ++++++++++++++-------
 .../broker/admin/v1/NonPersistentTopics.java       |  15 +--
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  15 +--
 .../broker/admin/v2/NonPersistentTopics.java       |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   5 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  27 ++---
 .../pulsar/broker/service/ReplicatorTest.java      |  62 ++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |   8 +-
 9 files changed, 188 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 8806497..530b350 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -42,6 +42,7 @@ import 
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -619,7 +620,8 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return topicPartitions;
     }
 
-    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions) {
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions,
+                                                  boolean 
createLocalTopicOnly) {
         Integer maxTopicsPerNamespace = null;
 
         try {
@@ -672,55 +674,57 @@ public abstract class AdminResource extends 
PulsarWebResource {
                     "Number of partitions should be less than or equal to " + 
maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new 
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created 
partitioned topic {}", clientAppId(), topicName);
-                                
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but 
there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof 
AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already 
existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, 
"Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof 
BadVersionException) {
-                                    log.warn("[{}] Failed to create 
partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create 
partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new 
RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, 
createLocalTopicOnly)
+                    .thenCompose(ignored -> 
tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);
+                    });
         }).exceptionally(ex -> {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, ex);
             resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
+
+        if (!createLocalTopicOnly && topicName.isGlobal() && 
isNamespaceReplicated(namespaceName)) {
+            getNamespaceReplicatedClusters(namespaceName)
+                    .stream()
+                    .filter(cluster -> 
!cluster.equals(pulsar().getConfiguration().getClusterName()))
+                    .forEach(cluster -> createFutureList.add(
+                            ((TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+                                    .createPartitionedTopicAsync(
+                                            
topicName.getPartitionedTopicName(), numPartitions, true)));
+        }
+
+        FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to create partitions for topic {}", 
clientAppId(), topicName, ex.getCause());
+                if (ex.getCause() instanceof RestException) {
+                    asyncResponse.resume(ex.getCause());
+                } else {
+                    resumeAsyncResponseExceptionally(asyncResponse, 
ex.getCause());
+                }
+                return;
+            }
+            log.info("[{}] Successfully created partitions for topic {} in 
cluster {}",
+                    clientAppId(), topicName, 
pulsar().getConfiguration().getClusterName());
+            asyncResponse.resume(Response.noContent().build());
+        });
     }
 
     /**
@@ -747,6 +751,42 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
+    private CompletableFuture<Void> 
provisionPartitionedTopicPath(AsyncResponse asyncResponse,
+                                                                  int 
numPartitions,
+                                                                  boolean 
createLocalTopicOnly) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        String partitionedTopicPath = 
ZkAdminPaths.partitionedTopicPath(topicName);
+        namespaceResources()
+                .getPartitionedTopicResources()
+                .createAsync(partitionedTopicPath, new 
PartitionedTopicMetadata(numPartitions))
+                .whenComplete((ignored, ex) -> {
+                    if (ex != null) {
+                        if (ex instanceof AlreadyExistsException) {
+                            if (createLocalTopicOnly) {
+                                future.complete(null);
+                                return;
+                            }
+                            log.warn("[{}] Failed to create already existing 
partitioned topic {}",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, 
"Partitioned topic already exists"));
+                        } else if (ex instanceof BadVersionException) {
+                            log.warn("[{}] Failed to create partitioned topic 
{}: concurrent modification",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, 
"Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic 
{}", clientAppId(), topicName, ex);
+                            future.completeExceptionally(new 
RestException(ex.getCause()));
+                        }
+                        return;
+                    }
+                    log.info("[{}] Successfully created partitioned topic {}", 
clientAppId(), topicName);
+                    future.complete(null);
+                });
+        return future;
+    }
+
     protected void resumeAsyncResponseExceptionally(AsyncResponse 
asyncResponse, Throwable throwable) {
         if (throwable instanceof WebApplicationException) {
             asyncResponse.resume((WebApplicationException) throwable);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 20fd24d..daf6ea0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -142,14 +142,17 @@ public class NonPersistentTopics extends PersistentTopics 
{
             @ApiResponse(code = 406, message = "The number of partitions 
should be more than 0 and less than or equal"
                     + " to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse,
-                                       @PathParam("property") String property, 
@PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String 
namespace, @PathParam("topic") @Encoded
-                                               String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean 
createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, 
createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 46708dd..babff0d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -154,14 +154,17 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 406, message = "The number of partitions 
should be "
                     + "more than 0 and less than or equal to 
maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already 
exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse 
asyncResponse,
-                                       @PathParam("property") String property, 
@PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String 
namespace, @PathParam("topic") @Encoded
-                                                   String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean 
createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, 
createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index cd6b31c..9d29967 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -188,12 +188,12 @@ public class NonPersistentTopics extends PersistentTopics 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
-
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean 
createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validateTopicName(tenant, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, 
createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9772757..56e5ce1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -231,12 +231,13 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean 
createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, 
createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index db6caca..81ec0d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -785,7 +785,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, property, cluster, 
namespace, topic, 5);
+        persistentTopics.createPartitionedTopic(response, property, cluster, 
namespace, topic, 5, false);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, 
cluster, namespace), Lists
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 39a99d2..a1c7276 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -53,6 +53,7 @@ import 
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -169,7 +170,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // 3) Create the partitioned topic
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, 3, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
@@ -289,7 +290,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 
         // 3) Create the partitioned topic
         AsyncResponse response  = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, 1);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, 1, true);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
@@ -375,7 +376,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         doReturn(new 
Policies()).when(persistentTopics).getNamespacePolicies(any());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<RestException> errCaptor = 
ArgumentCaptor.forClass(RestException.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.CONFLICT.getStatusCode());
     }
@@ -399,7 +400,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, true, false, 10);
@@ -428,7 +429,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 6);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 6, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
@@ -458,13 +459,13 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     public void testGetPartitionedTopicsList() throws KeeperException, 
InterruptedException, PulsarAdminException {
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic1", 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic1", 3, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        nonPersistentTopic.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic2", 3);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, 
testNamespace, "test-topic2", 3, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
@@ -494,7 +495,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
     public void testCreateExistedPartition() {
         final AsyncResponse response = mock(AsyncResponse.class);
         final String topicName = "test-create-existed-partition";
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, topicName, 3, true);
 
         final String partitionName = 
TopicName.get(topicName).getPartition(0).getLocalName();
         try {
@@ -513,7 +514,8 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, 
numPartitions, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
@@ -553,7 +555,8 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, 
numPartitions, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
@@ -596,7 +599,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 
         // create partitioned topic and compaction on it
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 2);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 2, true);
         persistentTopics.compact(response, testTenant, testNamespace, 
partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -613,7 +616,7 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
                 topicName).toString();
         final String subscriptionName = "sub";
 
-        admin.topics().createPartitionedTopic(topic, 3);
+        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, 
true).get();
 
         final String partitionedTopic = topic + "-partition-0";
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d261e85..0ea39aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -35,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.SortedSet;
@@ -62,6 +64,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -79,7 +82,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -1121,6 +1123,64 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = newUniqueName("pulsar/ns");
+
+        final String persistentPartitionedTopic =
+                newUniqueName("persistent://" + namespace + "/partitioned");
+        final String persistentNonPartitionedTopic =
+                newUniqueName("persistent://" + namespace + 
"/non-partitioned");
+        final String nonPersistentPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + 
"/partitioned");
+        final String nonPersistentNonPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + 
"/non-partitioned");
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, 
numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, 
numPartitions);
+        
admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        
admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = 
admin1.topics().getPartitionedTopicList(namespace);
+        
Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        
Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // expected topic list didn't contain non-persistent-non-partitioned 
topic,
+        // because this model topic didn't create path in local metadata store.
+        List<String> expectedTopicList = Lists.newArrayList(
+                persistentNonPartitionedTopic, 
nonPersistentNonPartitionedTopic);
+        TopicName pt = TopicName.get(persistentPartitionedTopic);
+        for (int i = 0; i < numPartitions; i++) {
+            expectedTopicList.add(pt.getPartition(i).toString());
+        }
+
+        checkListContainExpectedTopic(admin1, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin2, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
+    }
+
+    private void checkListContainExpectedTopic(PulsarAdmin admin, String 
namespace, List<String> expectedTopicList) {
+        // wait non-partitioned topics replicators created finished
+        final List<String> list = new ArrayList<>();
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            list.clear();
+            list.addAll(admin.topics().getList(namespace));
+            return list.size() == expectedTopicList.size();
+        });
+        for (String expectTopic : expectedTopicList) {
+            Assert.assertTrue(list.contains(expectTopic));
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 267a2a0..0dcf7fa 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -384,9 +384,15 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, 
int numPartitions) {
+        return createPartitionedTopicAsync(topic, numPartitions, false);
+    }
+
+    public CompletableFuture<Void> createPartitionedTopicAsync(
+            String topic, int numPartitions, boolean createLocalTopicOnly) {
         checkArgument(numPartitions > 0, "Number of partitions should be more 
than 0");
         TopicName tn = validateTopic(topic);
-        WebTarget path = topicPath(tn, "partitions");
+        WebTarget path = topicPath(tn, "partitions")
+                .queryParam("createLocalTopicOnly", 
Boolean.toString(createLocalTopicOnly));
         return asyncPutRequest(path, Entity.entity(numPartitions, 
MediaType.APPLICATION_JSON));
     }
 

Reply via email to