rdhabalia closed pull request #2269: handle subscription-already-exist
exception on partitioned-topic for create-sub admin-api
URL: https://github.com/apache/incubator-pulsar/pull/2269
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf8355f4c9..f85c6175a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,9 +38,11 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
@@ -829,15 +831,34 @@ protected void internalCreateSubscription(String
subscriptionName, MessageIdImpl
try {
if (partitionMetadata.partitions > 0) {
// Create the subscription on each partition
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
PulsarAdmin admin = pulsar().getAdminClient();
+ CountDownLatch latch = new
CountDownLatch(partitionMetadata.partitions);
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ AtomicInteger failureCount = new AtomicInteger(0);
+
for (int i = 0; i < partitionMetadata.partitions; i++) {
-
futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(),
- subscriptionName, messageId));
+ admin.persistentTopics()
+
.createSubscriptionAsync(topicName.getPartition(i).toString(),
subscriptionName, messageId)
+ .handle((result, ex) -> {
+ if (ex != null) {
+ int c = failureCount.incrementAndGet();
+ // fail the operation on unknown exception
or if all the partitioned failed due to
+ // subscription-already-exist
+ if (c == partitionMetadata.partitions
+ || !(ex instanceof
PulsarAdminException.ConflictException)) {
+ exception.set(ex);
+ }
+ }
+ latch.countDown();
+ return null;
+ });
}
- FutureUtil.waitForAll(futures).join();
+ latch.await();
+ if (exception.get() != null) {
+ throw exception.get();
+ }
} else {
validateAdminOperationOnTopic(authoritative);
@@ -850,10 +871,10 @@ protected void internalCreateSubscription(String
subscriptionName, MessageIdImpl
PersistentSubscription subscription = (PersistentSubscription)
topic
.createSubscription(subscriptionName,
InitialPosition.Latest).get();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(),
messageId.getEntryId())).get();
- log.info("[{}][{}] Successfully created subscription {} at
message id {}", clientAppId(),
- topicName, subscriptionName, messageId);
+ log.info("[{}][{}] Successfully created subscription {} at
message id {}", clientAppId(), topicName,
+ subscriptionName, messageId);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to create subscription {} at message id
{}", clientAppId(),
topicName, subscriptionName, messageId, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 0cf2c5ca91..f5d8bd0db9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -103,4 +103,30 @@ public void createSubscriptionOnPartitionedTopic() throws
Exception {
Lists.newArrayList("sub-1"));
}
}
+
+ @Test
+ public void createSubscriptionOnPartitionedTopicWithPartialFailure()
throws Exception {
+ String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ // create subscription for one partition
+ final String partitionedTopic0 = topic+"-partition-0";
+ admin.topics().createSubscription(partitionedTopic0, "sub-1",
MessageId.latest);
+
+ admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
+
+ // Create should fail if the subscription already exists
+ try {
+ admin.topics().createSubscription(topic, "sub-1",
MessageId.latest);
+ fail("Should have failed");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(
+
admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()),
+ Lists.newArrayList("sub-1"));
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services