gaoran10 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654266958



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
##########
@@ -1004,6 +1004,59 @@ public void testTopicReplicatedAndProducerCreate(String 
topicPrefix, String topi
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = randomSuffixString("pulsar/ns", 5);
+
+        final String persistentPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + 
"/partitioned", 5);
+        final String persistentNonPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + 
"/non-partitioned", 5);
+        final String nonPersistentPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + 
"/partitioned", 5);
+        final String nonPersistentNonPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + 
"/non-partitioned", 5);
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, 
Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, 
numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, 
numPartitions);
+        
admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        
admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = 
admin1.topics().getPartitionedTopicList(namespace);
+        
Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        
Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // wait non-partitioned topics replicators created finished
+        Thread.sleep(1000);

Review comment:
       Ok, I'll fix this. Thanks.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    public static String randomSuffixString(String content, int numSuffix) {
+        return content + "-" + 
RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();

Review comment:
       The method `newUniqueName ` could work well, but the UUID suffix is hard 
to read and check, maybe a random alphabetic string is enough. WDYT?

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int 
numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a 
producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       Yes, I think this should be a broker internal behavior, the param 
`createLocalTopicOnly` shouldn't be specified by users. @codelipenghui Please 
confirm.

##########
File path: 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int 
numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a 
producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       Yes, I think this should be a broker internal behavior, the param 
`createLocalTopicOnly` shouldn't be specified by users.
   
   @codelipenghui Please confirm.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + 
maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new 
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created 
partitioned topic {}", clientAppId(), topicName);
-                                
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    
asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create 
partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but 
there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof 
AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already 
existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, 
"Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof 
BadVersionException) {
-                                    log.warn("[{}] Failed to create 
partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new 
RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create 
partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new 
RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, 
createLocalTopicOnly)
+                    .thenCompose(ignored -> 
tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);

Review comment:
       I'm not sure about this comment.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + 
maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", 
clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This 
topic already exists"));

Review comment:
       This method is used to create a partitioned topic, if the topic already 
exists in the local cluster, then the request should be rejected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to