rdhabalia closed pull request #2269: handle subscription-already-exist 
exception on partitioned-topic for create-sub admin-api
URL: https://github.com/apache/incubator-pulsar/pull/2269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf8355f4c9..f85c6175a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,9 +38,11 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
@@ -829,15 +831,34 @@ protected void internalCreateSubscription(String 
subscriptionName, MessageIdImpl
         try {
             if (partitionMetadata.partitions > 0) {
                 // Create the subscription on each partition
-                List<CompletableFuture<Void>> futures = Lists.newArrayList();
                 PulsarAdmin admin = pulsar().getAdminClient();
 
+                CountDownLatch latch = new 
CountDownLatch(partitionMetadata.partitions);
+                AtomicReference<Throwable> exception = new AtomicReference<>();
+                AtomicInteger failureCount = new AtomicInteger(0);
+
                 for (int i = 0; i < partitionMetadata.partitions; i++) {
-                    
futures.add(admin.topics().createSubscriptionAsync(topicName.getPartition(i).toString(),
-                            subscriptionName, messageId));
+                    admin.persistentTopics()
+                            
.createSubscriptionAsync(topicName.getPartition(i).toString(), 
subscriptionName, messageId)
+                            .handle((result, ex) -> {
+                                if (ex != null) {
+                                    int c = failureCount.incrementAndGet();
+                                    // fail the operation on unknown exception 
or if all the partitioned failed due to
+                                    // subscription-already-exist
+                                    if (c == partitionMetadata.partitions
+                                            || !(ex instanceof 
PulsarAdminException.ConflictException)) {
+                                        exception.set(ex);
+                                    }
+                                }
+                                latch.countDown();
+                                return null;
+                            });
                 }
 
-                FutureUtil.waitForAll(futures).join();
+                latch.await();
+                if (exception.get() != null) {
+                    throw exception.get();
+                }
             } else {
                 validateAdminOperationOnTopic(authoritative);
 
@@ -850,10 +871,10 @@ protected void internalCreateSubscription(String 
subscriptionName, MessageIdImpl
                 PersistentSubscription subscription = (PersistentSubscription) 
topic
                         .createSubscription(subscriptionName, 
InitialPosition.Latest).get();
                 
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId())).get();
-                log.info("[{}][{}] Successfully created subscription {} at 
message id {}", clientAppId(),
-                        topicName, subscriptionName, messageId);
+                log.info("[{}][{}] Successfully created subscription {} at 
message id {}", clientAppId(), topicName,
+                        subscriptionName, messageId);
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             Throwable t = e.getCause();
             log.warn("[{}] [{}] Failed to create subscription {} at message id 
{}", clientAppId(),
                     topicName, subscriptionName, messageId, e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 0cf2c5ca91..f5d8bd0db9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -103,4 +103,30 @@ public void createSubscriptionOnPartitionedTopic() throws 
Exception {
                     Lists.newArrayList("sub-1"));
         }
     }
+    
+    @Test
+    public void createSubscriptionOnPartitionedTopicWithPartialFailure() 
throws Exception {
+        String topic = "persistent://my-property/my-ns/my-partitioned-topic";
+        admin.topics().createPartitionedTopic(topic, 10);
+        
+        // create subscription for one partition
+        final String partitionedTopic0 = topic+"-partition-0";
+        admin.topics().createSubscription(partitionedTopic0, "sub-1", 
MessageId.latest);
+
+        admin.topics().createSubscription(topic, "sub-1", MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.topics().createSubscription(topic, "sub-1", 
MessageId.latest);
+            fail("Should have failed");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(
+                    
admin.topics().getSubscriptions(TopicName.get(topic).getPartition(i).toString()),
+                    Lists.newArrayList("sub-1"));
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to