This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d482c05 Fixed increment partitions operation (#1153) d482c05 is described below commit d482c0518f06204079024d2890a7fda3ac1b7050 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sat Feb 3 08:29:33 2018 -0800 Fixed increment partitions operation (#1153) --- .../pulsar/broker/admin/PersistentTopics.java | 107 +++++++-------------- .../apache/pulsar/broker/admin/AdminApiTest2.java | 29 +++--- .../broker/admin/IncrementPartitionsTest.java | 95 ++++++++++++++++++ 3 files changed, 147 insertions(+), 84 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java index da8bb0a..a3b246e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.common.util.Codec.decode; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.MessageId; @@ -1504,78 +1506,43 @@ public class PersistentTopics extends AdminResource { return; } - // get list of cursors name of partition-1 - final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding(); - final Set<Topic> topics = Sets.newConcurrentHashSet(); - ((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName, - new MetaStoreCallback<List<String>>() { - - @Override - public void operationComplete(List<String> cursors, - org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) { - List<CompletableFuture<Void>> subscriptionCreationFuture = Lists.newArrayList(); - // create subscriptions for all new partition-topics - cursors.forEach(cursor -> { - String subName = Codec.decode(cursor); - for (int i = partitionMetadata.partitions; i < numPartitions; i++) { - final String topicName = dn.getPartition(i).toString(); - CompletableFuture<Void> future = new CompletableFuture<>(); - pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> { - // cache topic to close all of them after creating all subscriptions - topics.add(topic); - if (ex != null) { - log.warn("[{}] Failed to create topic {}", clientAppId(), topicName); - future.completeExceptionally(ex); - return null; - } else { - topic.createSubscription(subName).handle((sub, e) -> { - if (e != null) { - log.warn("[{}] Failed to create subsciption {} {}", clientAppId(), - topicName, subName); - future.completeExceptionally(e); - return null; - } else { - log.info("[{}] Successfully created subsciption {} {}", - clientAppId(), topicName, subName); - future.complete(null); - return null; - } - }); - return null; - } - }); - subscriptionCreationFuture.add(future); - } - }); - // wait for all subscriptions to be created - FutureUtil.waitForAll(subscriptionCreationFuture).handle((res, subscriptionException) -> { - // close all topics and then complete result future - FutureUtil.waitForAll( - topics.stream().map(topic -> topic.close()).collect(Collectors.toList())) - .handle((closed, topicCloseException) -> { - if (topicCloseException != null) { - log.warn("Failed to close newly created partitioned topics for {} ", dn, - topicCloseException); - } - if (subscriptionException != null) { - result.completeExceptionally(subscriptionException); - } else { - log.info("[{}] Successfully created new partitions {}", clientAppId(), - dn.toString()); - result.complete(null); - } - return null; - }); - return null; - }); - } + PulsarAdmin admin; + try { + admin = pulsar().getAdminClient(); + } catch (PulsarServerException e1) { + result.completeExceptionally(e1); + return; + } - @Override - public void operationFailed(MetaStoreException ex) { - log.warn("[{}] Failed to get list of cursors of {}", clientAppId(), ledgerName); - result.completeExceptionally(ex); - } + admin.persistentTopics().getStatsAsync(dn.getPartition(0).toString()).thenAccept(stats -> { + stats.subscriptions.keySet().forEach(subscription -> { + List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>(); + for (int i = partitionMetadata.partitions; i < numPartitions; i++) { + final String topicName = dn.getPartition(i).toString(); + + subscriptionFutures.add(admin.persistentTopics().createSubscriptionAsync(topicName, + subscription, MessageId.latest)); + } + + FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { + log.info("[{}] Successfully created new partitions {}", clientAppId(), dn); + result.complete(null); + }).exceptionally(ex -> { + log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), dn, ex); + result.completeExceptionally(ex); + return null; }); + }); + }).exceptionally(ex -> { + if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { + // The first partition doesn't exist, so there are currently to subscriptions to recreate + result.complete(null); + } else { + log.warn("[{}] Failed to get list of subscriptions of {}", clientAppId(), dn.getPartition(0), ex); + result.completeExceptionally(ex); + } + return null; + }); }).exceptionally(ex -> { log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), dn.toString()); result.completeExceptionally(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index f8affa5..bbbb42e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -126,13 +126,13 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { * 1. create a partitioned-topic * 2. update partitions with larger number of partitions * 3. verify: getPartitionedMetadata and check number of partitions - * 4. verify: this api creates existing subscription to new partitioned-topics - * so, message will not be lost in new partitions + * 4. verify: this api creates existing subscription to new partitioned-topics + * so, message will not be lost in new partitions * a. start producer and produce messages * b. check existing subscription for new topics and it should have backlog msgs - * + * * </pre> - * + * * @param topicName * @throws Exception */ @@ -224,6 +224,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { consumer2.close(); } + /** * verifies admin api command for non-persistent topic. * It verifies: partitioned-topic, stats @@ -280,10 +281,10 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { producer.close(); } - + /** * verifies validation on persistent-policies - * + * * @throws Exception */ @Test @@ -321,7 +322,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { /** * validates update of persistent-policies reflects on managed-ledger and managed-cursor - * + * * @throws Exception */ @Test @@ -360,7 +361,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { /** * Verify unloading topic - * + * * @throws Exception */ @Test(dataProvider = "topicType") @@ -412,14 +413,14 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { /** * Verifies reset-cursor at specific position using admin-api. - * + * * <pre> * 1. Publish 50 messages * 2. Consume 20 messages * 3. reset cursor position on 10th message * 4. consume 40 messages from reset position * </pre> - * + * * @param namespaceName * @throws Exception */ @@ -519,7 +520,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { /** * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api - * + * * @throws Exception */ @Test @@ -589,7 +590,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { /** * It validates that peer-cluster can't coexist in replication-cluster list - * + * * @throws Exception */ @Test @@ -709,14 +710,14 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest { List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group"); assertEquals(namespaces2.size(), 0); } - + @Test public void testNonPersistentTopics() throws Exception { final String namespace = "prop-xyz/use/ns2"; final String topicName = "non-persistent://" + namespace + "/topic"; admin.namespaces().createNamespace(namespace, 20); int totalTopics = 100; - + Set<String> topicNames = Sets.newHashSet(); for (int i = 0; i < totalTopics; i++) { topicNames.add(topicName + i); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java new file mode 100644 index 0000000..779466a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest { + + private MockedPulsarService mockPulsarSetup; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setLoadBalancerEnabled(true); + super.internalSetup(); + + // create otherbroker to test redirect on calls that need + // namespace ownership + mockPulsarSetup = new MockedPulsarService(this.conf); + mockPulsarSetup.setup(); + + // Setup namespaces + admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT)); + PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use")); + admin.properties().createProperty("prop-xyz", propertyAdmin); + admin.namespaces().createNamespace("prop-xyz/use/ns1"); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + mockPulsarSetup.cleanup(); + } + + @Test + public void testIncrementPartitionsOfTopicOnUnusedTopic() throws Exception { + final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic"; + + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10); + + admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); + } + + @Test + public void testIncrementPartitionsOfTopic() throws Exception { + final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic-2"; + + admin.persistentTopics().createPartitionedTopic(partitionedTopicName, 10); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10); + + Consumer consumer = pulsarClient.subscribe(partitionedTopicName, "sub-1"); + + admin.persistentTopics().updatePartitionedTopic(partitionedTopicName, 20); + assertEquals(admin.persistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); + + assertEquals( + admin.persistentTopics() + .getSubscriptions(DestinationName.get(partitionedTopicName).getPartition(15).toString()), + Lists.newArrayList("sub-1")); + + consumer.close(); + } +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.