This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d482c05  Fixed increment partitions operation (#1153)
d482c05 is described below

commit d482c0518f06204079024d2890a7fda3ac1b7050
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sat Feb 3 08:29:33 2018 -0800

    Fixed increment partitions operation (#1153)
---
 .../pulsar/broker/admin/PersistentTopics.java      | 107 +++++++--------------
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  29 +++---
 .../broker/admin/IncrementPartitionsTest.java      |  95 ++++++++++++++++++
 3 files changed, 147 insertions(+), 84 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index da8bb0a..a3b246e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.common.util.Codec.decode;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,6 +78,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageId;
@@ -1504,78 +1506,43 @@ public class PersistentTopics extends AdminResource {
                 return;
             }
 
-            // get list of cursors name of partition-1
-            final String ledgerName = 
dn.getPartition(1).getPersistenceNamingEncoding();
-            final Set<Topic> topics = Sets.newConcurrentHashSet();
-            ((ManagedLedgerFactoryImpl) 
pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName,
-                    new MetaStoreCallback<List<String>>() {
-
-                        @Override
-                        public void operationComplete(List<String> cursors,
-                                
org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) {
-                            List<CompletableFuture<Void>> 
subscriptionCreationFuture = Lists.newArrayList();
-                            // create subscriptions for all new 
partition-topics
-                            cursors.forEach(cursor -> {
-                                String subName = Codec.decode(cursor);
-                                for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
-                                    final String topicName = 
dn.getPartition(i).toString();
-                                    CompletableFuture<Void> future = new 
CompletableFuture<>();
-                                    
pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
-                                        // cache topic to close all of them 
after creating all subscriptions
-                                        topics.add(topic);
-                                        if (ex != null) {
-                                            log.warn("[{}] Failed to create 
topic {}", clientAppId(), topicName);
-                                            future.completeExceptionally(ex);
-                                            return null;
-                                        } else {
-                                            
topic.createSubscription(subName).handle((sub, e) -> {
-                                                if (e != null) {
-                                                    log.warn("[{}] Failed to 
create subsciption {} {}", clientAppId(),
-                                                            topicName, 
subName);
-                                                    
future.completeExceptionally(e);
-                                                    return null;
-                                                } else {
-                                                    log.info("[{}] 
Successfully created subsciption {} {}",
-                                                            clientAppId(), 
topicName, subName);
-                                                    future.complete(null);
-                                                    return null;
-                                                }
-                                            });
-                                            return null;
-                                        }
-                                    });
-                                    subscriptionCreationFuture.add(future);
-                                }
-                            });
-                            // wait for all subscriptions to be created
-                            
FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, 
subscriptionException) -> {
-                                // close all topics and then complete result 
future
-                                FutureUtil.waitForAll(
-                                        topics.stream().map(topic -> 
topic.close()).collect(Collectors.toList()))
-                                        .handle((closed, topicCloseException) 
-> {
-                                            if (topicCloseException != null) {
-                                                log.warn("Failed to close 
newly created partitioned topics for {} ", dn,
-                                                        topicCloseException);
-                                            }
-                                            if (subscriptionException != null) 
{
-                                                
result.completeExceptionally(subscriptionException);
-                                            } else {
-                                                log.info("[{}] Successfully 
created new partitions {}", clientAppId(),
-                                                        dn.toString());
-                                                result.complete(null);
-                                            }
-                                            return null;
-                                        });
-                                return null;
-                            });
-                        }
+            PulsarAdmin admin;
+            try {
+                admin = pulsar().getAdminClient();
+            } catch (PulsarServerException e1) {
+                result.completeExceptionally(e1);
+                return;
+            }
 
-                        @Override
-                        public void operationFailed(MetaStoreException ex) {
-                            log.warn("[{}] Failed to get list of cursors of 
{}", clientAppId(), ledgerName);
-                            result.completeExceptionally(ex);
-                        }
+            
admin.persistentTopics().getStatsAsync(dn.getPartition(0).toString()).thenAccept(stats
 -> {
+                stats.subscriptions.keySet().forEach(subscription -> {
+                    List<CompletableFuture<Void>> subscriptionFutures = new 
ArrayList<>();
+                    for (int i = partitionMetadata.partitions; i < 
numPartitions; i++) {
+                        final String topicName = dn.getPartition(i).toString();
+
+                        
subscriptionFutures.add(admin.persistentTopics().createSubscriptionAsync(topicName,
+                                subscription, MessageId.latest));
+                    }
+
+                    FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
+                        log.info("[{}] Successfully created new partitions 
{}", clientAppId(), dn);
+                        result.complete(null);
+                    }).exceptionally(ex -> {
+                        log.warn("[{}] Failed to create subscriptions on new 
partitions for {}", clientAppId(), dn, ex);
+                        result.completeExceptionally(ex);
+                        return null;
                     });
+                });
+            }).exceptionally(ex -> {
+                if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
+                    // The first partition doesn't exist, so there are 
currently to subscriptions to recreate
+                    result.complete(null);
+                } else {
+                    log.warn("[{}] Failed to get list of subscriptions of {}", 
clientAppId(), dn.getPartition(0), ex);
+                    result.completeExceptionally(ex);
+                }
+                return null;
+            });
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partition metadata for {}", 
clientAppId(), dn.toString());
             result.completeExceptionally(ex);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f8affa5..bbbb42e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -126,13 +126,13 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
      * 1. create a partitioned-topic
      * 2. update partitions with larger number of partitions
      * 3. verify: getPartitionedMetadata and check number of partitions
-     * 4. verify: this api creates existing subscription to new 
partitioned-topics 
-     *            so, message will not be lost in new partitions 
+     * 4. verify: this api creates existing subscription to new 
partitioned-topics
+     *            so, message will not be lost in new partitions
      *  a. start producer and produce messages
      *  b. check existing subscription for new topics and it should have 
backlog msgs
-     * 
+     *
      * </pre>
-     * 
+     *
      * @param topicName
      * @throws Exception
      */
@@ -224,6 +224,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         consumer2.close();
     }
 
+
     /**
      * verifies admin api command for non-persistent topic.
      * It verifies: partitioned-topic, stats
@@ -280,10 +281,10 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
         producer.close();
     }
-    
+
     /**
      * verifies validation on persistent-policies
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -321,7 +322,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
     /**
      * validates update of persistent-policies reflects on managed-ledger and 
managed-cursor
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -360,7 +361,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
     /**
      * Verify unloading topic
-     * 
+     *
      * @throws Exception
      */
     @Test(dataProvider = "topicType")
@@ -412,14 +413,14 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
     /**
      * Verifies reset-cursor at specific position using admin-api.
-     * 
+     *
      * <pre>
      * 1. Publish 50 messages
      * 2. Consume 20 messages
      * 3. reset cursor position on 10th message
      * 4. consume 40 messages from reset position
      * </pre>
-     * 
+     *
      * @param namespaceName
      * @throws Exception
      */
@@ -519,7 +520,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
     /**
      * It verifies that pulsar with different load-manager generates different 
load-report and returned by admin-api
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -589,7 +590,7 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
 
     /**
      * It validates that peer-cluster can't coexist in replication-cluster list
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -709,14 +710,14 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
         List<String> namespaces2 = 
admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
         assertEquals(namespaces2.size(), 0);
     }
-    
+
     @Test
     public void testNonPersistentTopics() throws Exception {
         final String namespace = "prop-xyz/use/ns2";
         final String topicName = "non-persistent://" + namespace + "/topic";
         admin.namespaces().createNamespace(namespace, 20);
         int totalTopics = 100;
-        
+
         Set<String> topicNames = Sets.newHashSet();
         for (int i = 0; i < totalTopics; i++) {
             topicNames.add(topicName + i);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
new file mode 100644
index 0000000..779466a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
+
+    private MockedPulsarService mockPulsarSetup;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setLoadBalancerEnabled(true);
+        super.internalSetup();
+
+        // create otherbroker to test redirect on calls that need
+        // namespace ownership
+        mockPulsarSetup = new MockedPulsarService(this.conf);
+        mockPulsarSetup.setup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("use", new 
ClusterData("http://127.0.0.1"; + ":" + BROKER_WEBSERVICE_PORT));
+        PropertyAdmin propertyAdmin = new 
PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
+        admin.properties().createProperty("prop-xyz", propertyAdmin);
+        admin.namespaces().createNamespace("prop-xyz/use/ns1");
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+        mockPulsarSetup.cleanup();
+    }
+
+    @Test
+    public void testIncrementPartitionsOfTopicOnUnusedTopic() throws Exception 
{
+        final String partitionedTopicName = 
"persistent://prop-xyz/use/ns1/test-topic";
+
+        admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 
10);
+        
assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 10);
+
+        admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 
20);
+        
assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 20);
+    }
+
+    @Test
+    public void testIncrementPartitionsOfTopic() throws Exception {
+        final String partitionedTopicName = 
"persistent://prop-xyz/use/ns1/test-topic-2";
+
+        admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 
10);
+        
assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 10);
+
+        Consumer consumer = pulsarClient.subscribe(partitionedTopicName, 
"sub-1");
+
+        admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 
20);
+        
assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 20);
+
+        assertEquals(
+                admin.persistentTopics()
+                        
.getSubscriptions(DestinationName.get(partitionedTopicName).getPartition(15).toString()),
+                Lists.newArrayList("sub-1"));
+
+        consumer.close();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to