gaoran10 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654266958
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
##########
@@ -1004,6 +1004,59 @@ public void testTopicReplicatedAndProducerCreate(String
topicPrefix, String topi
nonPersistentProducer2.close();
}
+ @Test
+ public void createPartitionedTopicTest() throws Exception {
+ final String cluster1 = pulsar1.getConfig().getClusterName();
+ final String cluster2 = pulsar2.getConfig().getClusterName();
+ final String cluster3 = pulsar3.getConfig().getClusterName();
+ final String namespace = randomSuffixString("pulsar/ns", 5);
+
+ final String persistentPartitionedTopic =
+ randomSuffixString("persistent://" + namespace +
"/partitioned", 5);
+ final String persistentNonPartitionedTopic =
+ randomSuffixString("persistent://" + namespace +
"/non-partitioned", 5);
+ final String nonPersistentPartitionedTopic =
+ randomSuffixString("non-persistent://" + namespace +
"/partitioned", 5);
+ final String nonPersistentNonPartitionedTopic =
+ randomSuffixString("non-persistent://" + namespace +
"/non-partitioned", 5);
+ final int numPartitions = 3;
+
+ admin1.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster1, cluster2, cluster3));
+ admin1.namespaces().setNamespaceReplicationClusters(namespace,
Sets.newHashSet("r1", "r2", "r3"));
+
+ admin1.topics().createPartitionedTopic(persistentPartitionedTopic,
numPartitions);
+ admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic,
numPartitions);
+
admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+
admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+ List<String> partitionedTopicList =
admin1.topics().getPartitionedTopicList(namespace);
+
Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+
Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+ // wait non-partitioned topics replicators created finished
+ Thread.sleep(1000);
Review comment:
Ok, I'll fix this. Thanks.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
return prefix + "-" + UUID.randomUUID();
}
+ public static String randomSuffixString(String content, int numSuffix) {
+ return content + "-" +
RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();
Review comment:
The method `newUniqueName ` could work well, but the UUID suffix is hard
to read and check, maybe a random alphabetic string is enough. WDYT?
##########
File path:
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
*/
CompletableFuture<Void> createPartitionedTopicAsync(String topic, int
numPartitions);
+ /**
+ * Create a partitioned topic.
+ * <p/>
+ * Create a partitioned topic. It needs to be called before creating a
producer for a partitioned topic.
+ * <p/>
+ *
+ * @param topic
+ * Topic name
+ * @param numPartitions
+ * Number of partitions to create of the topic
+ * @param createLocalTopicOnly
+ * False indicate create topic in all replicate clusters,
+ * true indicate only create topic in local cluster.
+ * @throws PulsarAdminException
+ */
+ void createPartitionedTopic(
Review comment:
Yes, I think this should be a broker internal behavior, the param
`createLocalTopicOnly` shouldn't be specified by users. @codelipenghui Please
confirm.
##########
File path:
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
*/
CompletableFuture<Void> createPartitionedTopicAsync(String topic, int
numPartitions);
+ /**
+ * Create a partitioned topic.
+ * <p/>
+ * Create a partitioned topic. It needs to be called before creating a
producer for a partitioned topic.
+ * <p/>
+ *
+ * @param topic
+ * Topic name
+ * @param numPartitions
+ * Number of partitions to create of the topic
+ * @param createLocalTopicOnly
+ * False indicate create topic in all replicate clusters,
+ * true indicate only create topic in local cluster.
+ * @throws PulsarAdminException
+ */
+ void createPartitionedTopic(
Review comment:
Yes, I think this should be a broker internal behavior, the param
`createLocalTopicOnly` shouldn't be specified by users.
@codelipenghui Please confirm.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
"Number of partitions should be less than or equal to " +
maxPartitions));
return;
}
+
+ List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+ CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+ createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This
topic already exists"));
- } else {
-
- try {
- String path = ZkAdminPaths.partitionedTopicPath(topicName);
- namespaceResources().getPartitionedTopicResources()
- .createAsync(path, new
PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
- log.info("[{}] Successfully created
partitioned topic {}", clientAppId(), topicName);
-
tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
- log.info("[{}] Successfully created
partitions for topic {}", clientAppId(),
- topicName);
-
asyncResponse.resume(Response.noContent().build());
- }).exceptionally(e -> {
- log.error("[{}] Failed to create
partitions for topic {}", clientAppId(),
- topicName);
- // The partitioned topic is created but
there are some partitions create failed
- asyncResponse.resume(new RestException(e));
- return null;
- });
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof
AlreadyExistsException) {
- log.warn("[{}] Failed to create already
existing partitioned topic {}",
- clientAppId(), topicName);
- asyncResponse.resume(
- new RestException(Status.CONFLICT,
"Partitioned topic already exists"));
- } else if (ex.getCause() instanceof
BadVersionException) {
- log.warn("[{}] Failed to create
partitioned topic {}: concurrent modification",
- clientAppId(), topicName);
- asyncResponse.resume(new
RestException(Status.CONFLICT, "Concurrent modification"));
- } else {
- log.error("[{}] Failed to create
partitioned topic {}", clientAppId(), topicName,
- ex.getCause());
- asyncResponse.resume(new
RestException(ex.getCause()));
- }
- return null;
- });
- } catch (Exception e) {
- log.error("[{}] Failed to create partitioned topic {}",
clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- }
+ return;
}
+
+ provisionPartitionedTopicPath(asyncResponse, numPartitions,
createLocalTopicOnly)
+ .thenCompose(ignored ->
tryCreatePartitionsAsync(numPartitions))
+ .whenComplete((ignored, ex) -> {
+ if (ex != null) {
+ createLocalFuture.completeExceptionally(ex);
+ return;
+ }
+ createLocalFuture.complete(null);
Review comment:
I'm not sure about this comment.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
"Number of partitions should be less than or equal to " +
maxPartitions));
return;
}
+
+ List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+ CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+ createFutureList.add(createLocalFuture);
checkTopicExistsAsync(topicName).thenAccept(exists -> {
if (exists) {
log.warn("[{}] Failed to create already existing topic {}",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.CONFLICT, "This
topic already exists"));
Review comment:
This method is used to create a partitioned topic, if the topic already
exists in the local cluster, then the request should be rejected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]