This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7436fe handle subscription-already-exist exception on
partitioned-topic for create-sub admin-api (#2269)
c7436fe is described below
commit c7436fecb7a567a253a84451244b6ea50cf286fe
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Jul 31 11:33:02 2018 -0700
handle subscription-already-exist exception on partitioned-topic for
create-sub admin-api (#2269)
---
.../broker/admin/impl/PersistentTopicsBase.java | 35 +++++++++++++++++-----
.../broker/admin/CreateSubscriptionTest.java | 26 ++++++++++++++++
2 files changed, 54 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf8355f..f85c617 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,9 +38,11 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
@@ -829,15 +831,34 @@ public class PersistentTopicsBase extends AdminResource {
try {
if (partitionMetadata.partitions > 0) {
// Create the subscription on each partition
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
PulsarAdmin admin = pulsar().getAdminClient();
+ CountDownLatch latch = new
CountDownLatch(partitionMetadata.partitions);
+ AtomicReference<Throwable> exception = new AtomicReference<>();
+ AtomicInteger failureCount = new AtomicInteger(0);
+
for (int i = 0; i < partitionMetadata.partitions; i++) {
-
futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(),
- subscriptionName, messageId));
+ admin.persistentTopics()
+
.createSubscriptionAsync(topicName.getPartition(i).toString(),
subscriptionName, messageId)
+ .handle((result, ex) -> {
+ if (ex != null) {
+ int c = failureCount.incrementAndGet();
+ // fail the operation on unknown exception
or if all the partitioned failed due to
+ // subscription-already-exist
+ if (c == partitionMetadata.partitions
+ || !(ex instanceof
PulsarAdminException.ConflictException)) {
+ exception.set(ex);
+ }
+ }
+ latch.countDown();
+ return null;
+ });
}
- FutureUtil.waitForAll(futures).join();
+ latch.await();
+ if (exception.get() != null) {
+ throw exception.get();
+ }
} else {
validateAdminOperationOnTopic(authoritative);
@@ -850,10 +871,10 @@ public class PersistentTopicsBase extends AdminResource {
PersistentSubscription subscription = (PersistentSubscription)
topic
.createSubscription(subscriptionName,
InitialPosition.Latest).get();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(),
messageId.getEntryId())).get();
- log.info("[{}][{}] Successfully created subscription {} at
message id {}", clientAppId(),
- topicName, subscriptionName, messageId);
+ log.info("[{}][{}] Successfully created subscription {} at
message id {}", clientAppId(), topicName,
+ subscriptionName, messageId);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to create subscription {} at message id
{}", clientAppId(),
topicName, subscriptionName, messageId, e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 0cf2c5c..f5d8bd0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -103,4 +103,30 @@ public class CreateSubscriptionTest extends
ProducerConsumerBase {
Lists.newArrayList("sub-1"));
}
}
+
+ @Test
+ public void createSubscriptionOnPartitionedTopicWithPartialFailure()
throws Exception {
+ String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ // create subscription for one partition
+ final String partitionedTopic0 = topic+"-partition-0";
+ admin.topics().createSubscription(partitionedTopic0, "sub-1",
MessageId.latest);
+
+ admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
+
+ // Create should fail if the subscription already exists
+ try {
+ admin.topics().createSubscription(topic, "sub-1",
MessageId.latest);
+ fail("Should have failed");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(
+
admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()),
+ Lists.newArrayList("sub-1"));
+ }
+ }
}