This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9293adf25b1f0f58d4d5479c3f6aa51301a4dd6a Author: Rajan Dhabalia <[email protected]> AuthorDate: Wed Oct 6 14:17:44 2021 -0700 [pulsar-broker] Fix: handle failed partitions topic creation (#10374) * [pulsar-broker] Fix: handle failed partitions topic creation * fix test --- .../broker/admin/impl/PersistentTopicsBase.java | 70 ++++++++++++---------- .../pulsar/broker/admin/v1/PersistentTopics.java | 3 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 3 +- .../apache/pulsar/broker/admin/AdminApiTest2.java | 52 +++++++++++++--- .../org/apache/pulsar/broker/admin/AdminTest.java | 3 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 3 +- .../org/apache/pulsar/client/admin/Topics.java | 40 ++++++++++++- .../pulsar/client/admin/internal/TopicsImpl.java | 19 +++++- .../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 6 +- 10 files changed, 151 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7dff58d..aa40644 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -389,12 +389,12 @@ public class PersistentTopicsBase extends AdminResource { * @param numPartitions */ protected void internalUpdatePartitionedTopic(int numPartitions, - boolean updateLocalTopicOnly, boolean authoritative) { + boolean updateLocalTopicOnly, boolean authoritative, + boolean force) { validateTopicOwnership(topicName, authoritative); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); - // Only do the validation if it's the first hop. - if (!updateLocalTopicOnly) { + if (!updateLocalTopicOnly && !force) { validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions); } final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); @@ -459,7 +459,8 @@ public class PersistentTopicsBase extends AdminResource { } try { tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); - updatePartitionedTopic(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS); + updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC, + TimeUnit.SECONDS); } catch (Exception e) { if (e.getCause() instanceof RestException) { throw (RestException) e.getCause(); @@ -507,7 +508,7 @@ public class PersistentTopicsBase extends AdminResource { } results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics() .updatePartitionedTopicAsync(topicName.toString(), - numPartitions, true)); + numPartitions, true, false)); }); return FutureUtil.waitForAll(results); } @@ -3651,34 +3652,39 @@ public class PersistentTopicsBase extends AdminResource { } } - private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions) { - return createSubscriptions(topicName, numPartitions) - .thenCompose(__ -> { - CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, - p -> new PartitionedTopicMetadata(numPartitions)); - future.exceptionally(ex -> { - // If the update operation fails, clean up the partitions that were created - getPartitionedTopicMetadataAsync(topicName, false, false) - .thenAccept(metadata -> { - int oldPartition = metadata.partitions; - for (int i = oldPartition; i < numPartitions; i++) { - topicResources().deletePersistentTopicAsync(topicName.getPartition(i)) - .exceptionally(ex1 -> { - log.warn("[{}] Failed to clean up managedLedger {}", - clientAppId(), - topicName, ex1.getCause()); - return null; - }); - } - }).exceptionally(e -> { - log.warn("[{}] Failed to clean up managedLedger", topicName, e); - return null; - }); - return null; - }); - return future; + + private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) { + CompletableFuture<Void> result = new CompletableFuture<>(); + createSubscriptions(topicName, numPartitions).thenCompose(__ -> { + CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions)); + future.exceptionally(ex -> { + // If the update operation fails, clean up the partitions that were created + getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { + int oldPartition = metadata.partitions; + for (int i = oldPartition; i < numPartitions; i++) { + topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> { + log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName, + ex1.getCause()); + return null; + }); + } + }).exceptionally(e -> { + log.warn("[{}] Failed to clean up managedLedger", topicName, e); + return null; }); + return null; + }); + return future; + }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> { + if (force && ex.getCause() instanceof PulsarAdminException.ConflictException) { + result.complete(null); + return null; + } + result.completeExceptionally(ex); + return null; + }); + return result; } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 2e29380..2917482 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -231,9 +231,10 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("force") @DefaultValue("false") boolean force, int numPartitions) { validateTopicName(property, cluster, namespace, encodedTopic); - internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative); + internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index e0a5674..09b694c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -725,12 +725,13 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("force") @DefaultValue("false") boolean force, @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0") int numPartitions) { validatePartitionedTopicName(tenant, namespace, encodedTopic); validatePartitionedTopicMetadata(tenant, namespace, encodedTopic); - internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative); + internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 779eb45..33efa2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -2144,15 +2144,13 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { public void testGetTopicsWithDifferentMode() throws Exception { final String namespace = "prop-xyz/ns1"; - final String persistentTopicName = TopicName.get( - "persistent", - NamespaceName.get(namespace), - "get_topics_mode_" + UUID.randomUUID().toString()).toString(); + final String persistentTopicName = TopicName + .get("persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString()) + .toString(); - final String nonPersistentTopicName = TopicName.get( - "non-persistent", - NamespaceName.get(namespace), - "get_topics_mode_" + UUID.randomUUID().toString()).toString(); + final String nonPersistentTopicName = TopicName + .get("non-persistent", NamespaceName.get(namespace), "get_topics_mode_" + UUID.randomUUID().toString()) + .toString(); Producer<byte[]> producer1 = pulsarClient.newProducer().topic(persistentTopicName).create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(nonPersistentTopicName).create(); @@ -2185,6 +2183,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { producer2.close(); } + @Test(dataProvider = "isV1") public void testNonPartitionedTopic(boolean isV1) throws Exception { String tenant = "prop-xyz"; @@ -2195,4 +2194,41 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { admin.topics().createNonPartitionedTopic(topic); assertTrue(admin.topics().getList(namespace).contains(topic)); } + + /** + * Validate retring failed partitioned topic should succeed. + * @throws Exception + */ + @Test + public void testFailedUpdatePartitionedTopic() throws Exception { + final String topicName = "failed-topic"; + final String subName1 = topicName + "-my-sub-1"; + final int startPartitions = 4; + final int newPartitions = 8; + final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName; + + URL pulsarUrl = new URL(pulsar.getWebServiceAddress()); + + admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions); + PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); + Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1) + .subscriptionType(SubscriptionType.Shared).subscribe(); + consumer1.close(); + + // validate partition topic is created + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions); + + // create a subscription for few new partition which can fail + admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, + MessageId.earliest); + + try { + admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false); + } catch (PulsarAdminException.PreconditionFailedException e) { + // Ok + } + admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true); + // validate subscription is created for new partition. + assertNotNull(admin.topics().getStats(partitionedTopicName + "-partition-" + 6).getSubscriptions().get(subName1)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index fef58e5..63ade23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -826,7 +826,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest { verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, 10); + persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, + false, 10); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index d754f25..4a27dbb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -460,7 +460,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, false, false, 10); + persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, false, false, false, + 10); } @Test(timeOut = 10_000) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 06f1df0..f6fd641 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -444,7 +444,46 @@ public interface Topics { * Number of new partitions of already exist partitioned-topic * @param updateLocalTopicOnly * Used by broker for global topic with multiple replicated clusters + * @param force + * Update forcefully without validating existing partitioned topic + * @returns a future that can be used to track when the partitioned topic is updated + */ + void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly, boolean force) + throws PulsarAdminException; + + /** + * Update number of partitions of a non-global partitioned topic asynchronously. + * <p/> + * It requires partitioned-topic to be already exist and number of new partitions must be greater than existing + * number of partitions. Decrementing number of partitions requires deletion of topic which is not supported. + * <p/> * + * @param topic + * Topic name + * @param numPartitions + * Number of new partitions of already exist partitioned-topic + * @param updateLocalTopicOnly + * Used by broker for global topic with multiple replicated clusters + * @param force + * Update forcefully without validating existing partitioned topic + * @return a future that can be used to track when the partitioned topic is updated + */ + CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly, + boolean force); + + /** + * Update number of partitions of a non-global partitioned topic. + * <p/> + * It requires partitioned-topic to be already exist and number of new partitions must be greater than existing + * number of partitions. Decrementing number of partitions requires deletion of topic which is not supported. + * <p/> + * + * @param topic + * Topic name + * @param numPartitions + * Number of new partitions of already exist partitioned-topic + * @param updateLocalTopicOnly + * Used by broker for global topic with multiple replicated clusters * @returns a future that can be used to track when the partitioned topic is updated */ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly) @@ -463,7 +502,6 @@ public interface Topics { * Number of new partitions of already exist partitioned-topic * @param updateLocalTopicOnly * Used by broker for global topic with multiple replicated clusters - * * @return a future that can be used to track when the partitioned topic is updated */ CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index fa62f55..caf32e4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -455,14 +455,20 @@ public class TopicsImpl extends BaseResource implements Topics { @Override public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) { - return updatePartitionedTopicAsync(topic, numPartitions, false); + return updatePartitionedTopicAsync(topic, numPartitions, false, false); } @Override public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly) throws PulsarAdminException { + updatePartitionedTopic(topic, numPartitions, updateLocalTopicOnly, false); + } + + @Override + public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly, boolean force) + throws PulsarAdminException { try { - updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly) + updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly, force) .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); @@ -477,10 +483,17 @@ public class TopicsImpl extends BaseResource implements Topics { @Override public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, boolean updateLocalTopicOnly) { + return updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly, false); + } + + @Override + public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions, + boolean updateLocalTopicOnly, boolean force) { checkArgument(numPartitions > 0, "Number of partitions must be more than 0"); TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "partitions"); - path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly)); + path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly)).queryParam("force", + force); return asyncPostRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index ed7fa0d..32befc8 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -988,7 +988,7 @@ public class PulsarAdminToolTest { verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1"); cmdTopics.run(split("update-partitioned-topic persistent://myprop/clust/ns1/ds1 -p 6")); - verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6); + verify(mockTopics).updatePartitionedTopic("persistent://myprop/clust/ns1/ds1", 6, false, false); cmdTopics.run(split("get-partitioned-topic-metadata persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getPartitionedTopicMetadata("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index a4ca284..fbd037a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -423,10 +423,14 @@ public class CmdTopics extends CmdBase { "--partitions" }, description = "Number of partitions for the topic", required = true) private int numPartitions; + @Parameter(names = { "-f", + "--force" }, description = "Update forcefully without validating existing partitioned topic ", required = false) + private boolean force; + @Override void run() throws Exception { String topic = validateTopicName(params); - getTopics().updatePartitionedTopic(topic, numPartitions); + getTopics().updatePartitionedTopic(topic, numPartitions, false, force); } }
