This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9293adf25b1f0f58d4d5479c3f6aa51301a4dd6a
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Oct 6 14:17:44 2021 -0700

    [pulsar-broker] Fix: handle failed partitions topic creation (#10374)
    
    * [pulsar-broker] Fix: handle failed partitions topic creation
    
    * fix test
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 70 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  3 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  3 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 52 +++++++++++++---
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  3 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  3 +-
 .../org/apache/pulsar/client/admin/Topics.java     | 40 ++++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 19 +++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  2 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  6 +-
 10 files changed, 151 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7dff58d..aa40644 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -389,12 +389,12 @@ public class PersistentTopicsBase extends AdminResource {
      * @param numPartitions
      */
     protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean 
updateLocalTopicOnly, boolean authoritative) {
+                                                  boolean 
updateLocalTopicOnly, boolean authoritative,
+                                                  boolean force) {
         validateTopicOwnership(topicName, authoritative);
         validateTopicPolicyOperation(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE);
-
         // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly) {
+        if (!updateLocalTopicOnly && !force) {
             validatePartitionTopicUpdate(topicName.getLocalName(), 
numPartitions);
         }
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
@@ -459,7 +459,8 @@ public class PersistentTopicsBase extends AdminResource {
         }
         try {
             
tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, 
TimeUnit.SECONDS);
-            updatePartitionedTopic(topicName, 
numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
+            updatePartitionedTopic(topicName, numPartitions, 
force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
+                    TimeUnit.SECONDS);
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
                 throw (RestException) e.getCause();
@@ -507,7 +508,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
             
results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
                     .updatePartitionedTopicAsync(topicName.toString(),
-                            numPartitions, true));
+                            numPartitions, true, false));
         });
         return FutureUtil.waitForAll(results);
     }
@@ -3651,34 +3652,39 @@ public class PersistentTopicsBase extends AdminResource 
{
         }
     }
 
-    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions) {
-        return createSubscriptions(topicName, numPartitions)
-                .thenCompose(__ -> {
-                    CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
-                            .updatePartitionedTopicAsync(topicName,
-                                    p -> new 
PartitionedTopicMetadata(numPartitions));
-                    future.exceptionally(ex -> {
-                        // If the update operation fails, clean up the 
partitions that were created
-                        getPartitionedTopicMetadataAsync(topicName, false, 
false)
-                                .thenAccept(metadata -> {
-                                    int oldPartition = metadata.partitions;
-                                    for (int i = oldPartition; i < 
numPartitions; i++) {
-                                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i))
-                                                .exceptionally(ex1 -> {
-                                                    log.warn("[{}] Failed to 
clean up managedLedger {}",
-                                                            clientAppId(),
-                                                            topicName, 
ex1.getCause());
-                                                    return null;
-                                                });
-                                    }
-                                }).exceptionally(e -> {
-                                    log.warn("[{}] Failed to clean up 
managedLedger", topicName, e);
-                                    return null;
-                                });
-                        return null;
-                    });
-                    return future;
+
+    private CompletableFuture<Void> updatePartitionedTopic(TopicName 
topicName, int numPartitions, boolean force) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
+            CompletableFuture<Void> future = 
namespaceResources().getPartitionedTopicResources()
+                    .updatePartitionedTopicAsync(topicName, p -> new 
PartitionedTopicMetadata(numPartitions));
+            future.exceptionally(ex -> {
+                // If the update operation fails, clean up the partitions that 
were created
+                getPartitionedTopicMetadataAsync(topicName, false, 
false).thenAccept(metadata -> {
+                    int oldPartition = metadata.partitions;
+                    for (int i = oldPartition; i < numPartitions; i++) {
+                        
topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1
 -> {
+                            log.warn("[{}] Failed to clean up managedLedger 
{}", clientAppId(), topicName,
+                                    ex1.getCause());
+                            return null;
+                        });
+                    }
+                }).exceptionally(e -> {
+                    log.warn("[{}] Failed to clean up managedLedger", 
topicName, e);
+                    return null;
                 });
+                return null;
+            });
+            return future;
+        }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> {
+            if (force && ex.getCause() instanceof 
PulsarAdminException.ConflictException) {
+                result.complete(null);
+                return null;
+            }
+            result.completeExceptionally(ex);
+            return null;
+        });
+        return result;
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 2e29380..2917482 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -231,9 +231,10 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, 
authoritative);
+        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, 
authoritative, force);
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e0a5674..09b694c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -725,12 +725,13 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean 
updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
         validatePartitionedTopicName(tenant, namespace, encodedTopic);
         validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, 
authoritative);
+        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, 
authoritative, force);
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 779eb45..33efa2b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -2144,15 +2144,13 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     public void testGetTopicsWithDifferentMode() throws Exception {
         final String namespace = "prop-xyz/ns1";
 
-        final String persistentTopicName = TopicName.get(
-                "persistent",
-                NamespaceName.get(namespace),
-                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+        final String persistentTopicName = TopicName
+                .get("persistent", NamespaceName.get(namespace), 
"get_topics_mode_" + UUID.randomUUID().toString())
+                .toString();
 
-        final String nonPersistentTopicName = TopicName.get(
-                "non-persistent",
-                NamespaceName.get(namespace),
-                "get_topics_mode_" + UUID.randomUUID().toString()).toString();
+        final String nonPersistentTopicName = TopicName
+                .get("non-persistent", NamespaceName.get(namespace), 
"get_topics_mode_" + UUID.randomUUID().toString())
+                .toString();
 
         Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(persistentTopicName).create();
         Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(nonPersistentTopicName).create();
@@ -2185,6 +2183,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         producer2.close();
     }
 
+
     @Test(dataProvider = "isV1")
     public void testNonPartitionedTopic(boolean isV1) throws Exception {
         String tenant = "prop-xyz";
@@ -2195,4 +2194,41 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         admin.topics().createNonPartitionedTopic(topic);
         assertTrue(admin.topics().getList(namespace).contains(topic));
     }
+
+    /**
+     * Validate retring failed partitioned topic should succeed.
+     * @throws Exception
+     */
+    @Test
+    public void testFailedUpdatePartitionedTopic() throws Exception {
+        final String topicName = "failed-topic";
+        final String subName1 = topicName + "-my-sub-1";
+        final int startPartitions = 4;
+        final int newPartitions = 8;
+        final String partitionedTopicName = "persistent://prop-xyz/ns1/" + 
topicName;
+
+        URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
+
+        admin.topics().createPartitionedTopic(partitionedTopicName, 
startPartitions);
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
+        Consumer<byte[]> consumer1 = 
client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        consumer1.close();
+
+        // validate partition topic is created
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 startPartitions);
+
+        // create a subscription for few new partition which can fail
+        admin.topics().createSubscription(partitionedTopicName + "-partition-" 
+ startPartitions, subName1,
+                MessageId.earliest);
+
+        try {
+            admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, false);
+        } catch (PulsarAdminException.PreconditionFailedException e) {
+            // Ok
+        }
+        admin.topics().updatePartitionedTopic(partitionedTopicName, 
newPartitions, false, true);
+        // validate subscription is created for new partition.
+        assertNotNull(admin.topics().getStats(partitionedTopicName + 
"-partition-" + 6).getSubscriptions().get(subName1));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index fef58e5..63ade23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -826,7 +826,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         verify(response2, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
 
-        persistentTopics.updatePartitionedTopic(property, cluster, namespace, 
partitionedTopicName2, false, false, 10);
+        persistentTopics.updatePartitionedTopic(property, cluster, namespace, 
partitionedTopicName2, false, false,
+                false, 10);
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d754f25..4a27dbb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -460,7 +460,8 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionedTopicName, 5, true);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-        persistentTopics.updatePartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, false, false, 10);
+        persistentTopics.updatePartitionedTopic(testTenant, testNamespace, 
partitionedTopicName, false, false, false,
+                10);
     }
 
     @Test(timeOut = 10_000)
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 06f1df0..f6fd641 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -444,7 +444,46 @@ public interface Topics {
      *            Number of new partitions of already exist partitioned-topic
      * @param updateLocalTopicOnly
      *            Used by broker for global topic with multiple replicated 
clusters
+     * @param force
+     *            Update forcefully without validating existing partitioned 
topic
+     * @returns a future that can be used to track when the partitioned topic 
is updated
+     */
+    void updatePartitionedTopic(String topic, int numPartitions, boolean 
updateLocalTopicOnly, boolean force)
+            throws PulsarAdminException;
+
+    /**
+     * Update number of partitions of a non-global partitioned topic 
asynchronously.
+     * <p/>
+     * It requires partitioned-topic to be already exist and number of new 
partitions must be greater than existing
+     * number of partitions. Decrementing number of partitions requires 
deletion of topic which is not supported.
+     * <p/>
      *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of new partitions of already exist partitioned-topic
+     * @param updateLocalTopicOnly
+     *            Used by broker for global topic with multiple replicated 
clusters
+     * @param force
+     *            Update forcefully without validating existing partitioned 
topic
+     * @return a future that can be used to track when the partitioned topic 
is updated
+     */
+    CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int 
numPartitions, boolean updateLocalTopicOnly,
+            boolean force);
+
+    /**
+     * Update number of partitions of a non-global partitioned topic.
+     * <p/>
+     * It requires partitioned-topic to be already exist and number of new 
partitions must be greater than existing
+     * number of partitions. Decrementing number of partitions requires 
deletion of topic which is not supported.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of new partitions of already exist partitioned-topic
+     * @param updateLocalTopicOnly
+     *            Used by broker for global topic with multiple replicated 
clusters
      * @returns a future that can be used to track when the partitioned topic 
is updated
      */
     void updatePartitionedTopic(String topic, int numPartitions, boolean 
updateLocalTopicOnly)
@@ -463,7 +502,6 @@ public interface Topics {
      *            Number of new partitions of already exist partitioned-topic
      * @param updateLocalTopicOnly
      *            Used by broker for global topic with multiple replicated 
clusters
-     *
      * @return a future that can be used to track when the partitioned topic 
is updated
      */
     CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int 
numPartitions, boolean updateLocalTopicOnly);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index fa62f55..caf32e4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -455,14 +455,20 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
     @Override
     public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, 
int numPartitions) {
-        return updatePartitionedTopicAsync(topic, numPartitions, false);
+        return updatePartitionedTopicAsync(topic, numPartitions, false, false);
     }
 
     @Override
     public void updatePartitionedTopic(String topic, int numPartitions, 
boolean updateLocalTopicOnly)
             throws PulsarAdminException {
+        updatePartitionedTopic(topic, numPartitions, updateLocalTopicOnly, 
false);
+    }
+
+    @Override
+    public void updatePartitionedTopic(String topic, int numPartitions, 
boolean updateLocalTopicOnly, boolean force)
+            throws PulsarAdminException {
         try {
-            updatePartitionedTopicAsync(topic, numPartitions, 
updateLocalTopicOnly)
+            updatePartitionedTopicAsync(topic, numPartitions, 
updateLocalTopicOnly, force)
                     .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
@@ -477,10 +483,17 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     @Override
     public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, 
int numPartitions,
             boolean updateLocalTopicOnly) {
+        return updatePartitionedTopicAsync(topic, numPartitions, 
updateLocalTopicOnly, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, 
int numPartitions,
+            boolean updateLocalTopicOnly, boolean force) {
         checkArgument(numPartitions > 0, "Number of partitions must be more 
than 0");
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitions");
-        path = path.queryParam("updateLocalTopicOnly", 
Boolean.toString(updateLocalTopicOnly));
+        path = path.queryParam("updateLocalTopicOnly", 
Boolean.toString(updateLocalTopicOnly)).queryParam("force",
+                force);
         return asyncPostRequest(path, Entity.entity(numPartitions, 
MediaType.APPLICATION_JSON));
     }
 
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index ed7fa0d..32befc8 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -988,7 +988,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
 
         cmdTopics.run(split("update-partitioned-topic 
persistent://myprop/clust/ns1/ds1 -p 6"));
-        
verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 
6);
+        
verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 
6, false, false);
 
         cmdTopics.run(split("get-partitioned-topic-metadata 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index a4ca284..fbd037a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -423,10 +423,14 @@ public class CmdTopics extends CmdBase {
                 "--partitions" }, description = "Number of partitions for the 
topic", required = true)
         private int numPartitions;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Update forcefully without 
validating existing partitioned topic ", required = false)
+        private boolean force;
+
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            getTopics().updatePartitionedTopic(topic, numPartitions);
+            getTopics().updatePartitionedTopic(topic, numPartitions, false, 
force);
         }
     }
 

Reply via email to