This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 80ad05f Fix create consumer on partitioned topic while disable topic
auto creation. (#5572)
80ad05f is described below
commit 80ad05f795f0457b7213caad23c89cbdae50627d
Author: lipenghui <[email protected]>
AuthorDate: Thu Jan 9 10:40:15 2020 +0800
Fix create consumer on partitioned topic while disable topic auto creation.
(#5572)
### Motivation
Currently, disable the topic auto creation will cause consumer create
failed on a partitioned topic. Since the partitioned topic is already created,
so we should handle the topic partition create when disable the topic auto
creation.
### Modifications
By default, create partitioned topics also try to create all partitions,
and if create partitions failed, users can use `create-missed-partitions` to
repair.
If users already have a partitioned topic without created partitions, can
also use `create-missed-partitions` to repair.
(cherry picked from commit 602f1c207c220eb53defcedbd03e8732797e1e26)
---
.../java/org/apache/zookeeper/MockZooKeeper.java | 22 ++--
.../apache/pulsar/broker/admin/AdminResource.java | 55 +++++++-
.../apache/pulsar/broker/admin/ZkAdminPaths.java | 4 +
.../broker/admin/impl/PersistentTopicsBase.java | 51 +++++---
.../broker/admin/v1/NonPersistentTopics.java | 4 +-
.../broker/admin/v2/NonPersistentTopics.java | 4 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 26 +++-
.../pulsar/broker/service/BrokerService.java | 15 ++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 14 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 20 +--
.../pulsar/broker/admin/v1/V1_AdminApiTest.java | 6 +-
.../pulsar/client/api/PartitionCreationTest.java | 144 +++++++++++++++++++++
.../client/impl/PatternTopicsConsumerImplTest.java | 8 +-
.../org/apache/pulsar/client/admin/Topics.java | 20 +++
.../pulsar/client/admin/internal/TopicsImpl.java | 21 +++
.../pulsar/client/api/PulsarClientException.java | 2 +
.../org/apache/pulsar/admin/cli/CmdTopics.java | 16 +++
site2/docs/admin-api-partitioned-topics.md | 28 ++++
site2/docs/reference-pulsar-admin.md | 10 ++
19 files changed, 398 insertions(+), 72 deletions(-)
diff --git
a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index c417604..f4160ed 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -425,10 +425,13 @@ public class MockZooKeeper extends ZooKeeper {
if (path.length() >= item.length()) {
continue;
}
-
- String child = item.substring(path.length() + 1);
- if (!child.contains("/")) {
- children.add(child);
+ String child = item.substring(path.length());
+ if (child.indexOf("/") == 0) {
+ child = child.substring(1);
+ log.debug("child: '{}'", child);
+ if (!child.contains("/")) {
+ children.add(child);
+ }
}
}
}
@@ -465,10 +468,13 @@ public class MockZooKeeper extends ZooKeeper {
} else if (item.equals(path)) {
continue;
} else {
- String child = item.substring(path.length() + 1);
- log.debug("child: '{}'", child);
- if (!child.contains("/")) {
- children.add(child);
+ String child = item.substring(path.length());
+ if (child.indexOf("/") == 0) {
+ child = child.substring(1);
+ log.debug("child: '{}'", child);
+ if (!child.contains("/")) {
+ children.add(child);
+ }
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 02ebd06..e9d559e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -30,6 +30,8 @@ import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.servlet.ServletContext;
@@ -66,9 +68,9 @@ import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -111,6 +113,11 @@ public abstract class AdminResource extends
PulsarWebResource {
ZkUtils.createFullPathOptimistic(globalZk(), path, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
+ protected void zkCreateOptimisticAsync(String path, byte[] content,
AsyncCallback.StringCallback callback) {
+ ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, callback, null);
+ }
+
protected boolean zkPathExists(String path) throws KeeperException,
InterruptedException {
Stat stat = globalZk().exists(path, false);
if (null != stat) {
@@ -119,6 +126,21 @@ public abstract class AdminResource extends
PulsarWebResource {
return false;
}
+ protected void zkSync(String path) throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger rc = new
AtomicInteger(KeeperException.Code.OK.intValue());
+ globalZk().sync(path, (rc2, s, ctx) -> {
+ if (KeeperException.Code.OK.intValue() != rc2) {
+ rc.set(rc2);
+ }
+ latch.countDown();
+ }, null);
+ latch.await();
+ if (KeeperException.Code.OK.intValue() != rc.get()) {
+ throw KeeperException.create(KeeperException.Code.get(rc.get()));
+ }
+ }
+
/**
* Get the domain of the topic (whether it's persistent or non-persistent)
*/
@@ -233,6 +255,37 @@ public abstract class AdminResource extends
PulsarWebResource {
return namespaces;
}
+ protected void tryCreatePartitionsAsync(int numPartitions) {
+ if (!topicName.isPersistent()) {
+ return;
+ }
+ for (int i = 0; i < numPartitions; i++) {
+ tryCreatePartitionAsync(i);
+ }
+ }
+
+ private void tryCreatePartitionAsync(final int partition) {
+
zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)),
new byte[0],
+ (rc, s, o, s1) -> {
+ if (KeeperException.Code.OK.intValue() == rc) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic partition {} created.",
clientAppId(),
+ topicName.getPartition(partition));
+ }
+ } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+ log.info("[{}] Topic partition {} is exists, doing
nothing.", clientAppId(),
+ topicName.getPartition(partition));
+ } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+ log.warn("[{}] Fail to create topic partition {} with
concurrent modification, retry now.",
+ clientAppId(), topicName.getPartition(partition));
+ tryCreatePartitionAsync(partition);
+ } else {
+ log.error("[{}] Fail to create topic partition {}",
clientAppId(),
+ topicName.getPartition(partition),
KeeperException.create(KeeperException.Code.get(rc)));
+ }
+ });
+ }
+
protected NamespaceName namespaceName;
protected void validateNamespaceName(String property, String namespace) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
index c60422d..95954f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java
@@ -31,6 +31,10 @@ public class ZkAdminPaths {
name.getNamespace(), name.getDomain().value(),
name.getEncodedLocalName());
}
+ public static String managedLedgerPath(TopicName name) {
+ return "/managed-ledgers/" + name.getPersistenceNamingEncoding();
+ }
+
public static String namespacePoliciesPath(NamespaceName name) {
return adminPath(POLICIES, name.toString());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 972e1c3..7d4c8eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -116,7 +116,6 @@ import org.slf4j.LoggerFactory;
public class PersistentTopicsBase extends AdminResource {
private static final Logger log =
LoggerFactory.getLogger(PersistentTopicsBase.class);
- public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX =
"Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX =
Version.forIntegers(1, 21);
@@ -414,8 +413,9 @@ public class PersistentTopicsBase extends AdminResource {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
- // we wait for the data to be synced in all quorums and the
observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ tryCreatePartitionsAsync(numPartitions);
+ // Sync data to all quorums and the observers
+ zkSync(path);
log.info("[{}] Successfully created partitioned topic {}",
clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic
{}", clientAppId(), topicName);
@@ -533,6 +533,13 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected void internalCreateMissedPartitions() {
+ PartitionedTopicMetadata metadata =
getPartitionedTopicMetadata(topicName, false, false);
+ if (metadata != null) {
+ tryCreatePartitionsAsync(metadata.partitions);
+ }
+ }
+
private CompletableFuture<Void> updatePartitionInOtherCluster(int
numPartitions, Set<String> clusters) {
List<CompletableFuture<Void>> results = new
ArrayList<>(clusters.size() -1);
clusters.forEach(cluster -> {
@@ -620,8 +627,8 @@ public class PersistentTopicsBase extends AdminResource {
try {
globalZk().delete(path, -1);
globalZkCache().invalidate(path);
- // we wait for the data to be synced in all quorums and the
observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ // Sync data to all quorums and the observers
+ zkSync(path);
log.info("[{}] Deleted partitioned topic {}", clientAppId(),
topicName);
asyncResponse.resume(Response.noContent().build());
return;
@@ -1839,24 +1846,28 @@ public class PersistentTopicsBase extends AdminResource
{
}
admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats
-> {
- stats.subscriptions.keySet().forEach(subscription -> {
- List<CompletableFuture<Void>> subscriptionFutures = new
ArrayList<>();
- for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
- final String topicNamePartition =
topicName.getPartition(i).toString();
+ if (stats.subscriptions.size() == 0) {
+ result.complete(null);
+ } else {
+ stats.subscriptions.keySet().forEach(subscription -> {
+ List<CompletableFuture<Void>> subscriptionFutures =
new ArrayList<>();
+ for (int i = partitionMetadata.partitions; i <
numPartitions; i++) {
+ final String topicNamePartition =
topicName.getPartition(i).toString();
-
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.latest));
- }
+
subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
+ subscription, MessageId.latest));
+ }
- FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
- log.info("[{}] Successfully created new partitions
{}", clientAppId(), topicName);
- result.complete(null);
- }).exceptionally(ex -> {
- log.warn("[{}] Failed to create subscriptions on new
partitions for {}", clientAppId(), topicName, ex);
- result.completeExceptionally(ex);
- return null;
+ FutureUtil.waitForAll(subscriptionFutures).thenRun(()
-> {
+ log.info("[{}] Successfully created new partitions
{}", clientAppId(), topicName);
+ result.complete(null);
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to create subscriptions on
new partitions for {}", clientAppId(), topicName, ex);
+ result.completeExceptionally(ex);
+ return null;
+ });
});
- });
+ }
}).exceptionally(ex -> {
if (ex.getCause() instanceof
PulsarAdminException.NotFoundException) {
// The first partition doesn't exist, so there are
currently to subscriptions to recreate
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 7667167..0179847 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -144,8 +144,8 @@ public class NonPersistentTopics extends PersistentTopics {
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
- // we wait for the data to be synced in all quorums and the
observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ // Sync data to all quorums and the observers
+ zkSync(path);
log.info("[{}] Successfully created partitioned topic {}",
clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic
{}", clientAppId(), topicName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index eeaeb96..a41db33 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -190,8 +190,8 @@ public class NonPersistentTopics extends PersistentTopics {
topicName.getEncodedLocalName());
byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
zkCreateOptimistic(path, data);
- // we wait for the data to be synced in all quorums and the
observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+ // Sync data to all quorums and the observers
+ zkSync(path);
log.info("[{}] Successfully created partitioned topic {}",
clientAppId(), topicName);
} catch (KeeperException.NodeExistsException e) {
log.warn("[{}] Failed to create already existing partitioned topic
{}", clientAppId(), topicName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9eec8ae..08411dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -246,7 +246,7 @@ public class PersistentTopics extends PersistentTopicsBase {
*/
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
- @ApiOperation(value = "Increment partitons of an existing partitioned
topic.", notes = "It only increments partitions of existing non-global
partitioned-topic")
+ @ApiOperation(value = "Increment partitions of an existing partitioned
topic.", notes = "It only increments partitions of existing non-global
partitioned-topic")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to
adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@@ -270,6 +270,30 @@ public class PersistentTopics extends PersistentTopicsBase
{
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
}
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
+ @ApiOperation(value = "Create missed partitions of an existing partitioned
topic.", notes = "This is a best-effort operation for create missed partitions
of existing non-global partitioned-topic and does't throw any exceptions when
create failed")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "Don't have permission to
adminisActions to be grantedtrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant does not exist"),
+ @ApiResponse(code = 409, message = "Partitioned topic does not
exist"),
+ @ApiResponse(code = 412, message = "Partitioned topic name is
invalid"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public void createMissedPartitions(
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic) {
+
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
+ internalCreateMissedPartitions();
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index aa0e198..b9fd0a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -83,7 +82,6 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -1821,10 +1819,15 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
partitionedTopicPath(topicName), content,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc,
path1, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
- // we wait for the data to be synced in all
quorums and the observers
- executor().schedule(
- SafeRunnable.safeRun(() ->
partitionedTopicFuture.complete(configMetadata)),
-
PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS,
TimeUnit.MILLISECONDS);
+ // Sync data to all quorums and the observers
+
pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName),
+ (rc2, path2, ctx2) -> {
+ if (rc2 ==
KeeperException.Code.OK.intValue()) {
+
partitionedTopicFuture.complete(configMetadata);
+ } else {
+
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2));
+ }
+ }, null);
} else {
partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 855a20d..a0b48cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -815,9 +815,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
4);
- // check if the virtual topic doesn't get created
List<String> topics = admin.topics().getList("prop-xyz/ns1");
- assertEquals(topics.size(), 0);
+ assertEquals(topics.size(), 4);
assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
0);
@@ -829,15 +828,8 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedStats(partitionedTopicName,
false).partitions.size(),
0);
- try {
- admin.topics().getSubscriptions(partitionedTopicName);
- fail("should have failed");
- } catch (PulsarAdminException e) {
- // ok
- assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
- } catch (Exception e) {
- fail(e.getMessage());
- }
+ List<String> subscriptions =
admin.topics().getSubscriptions(partitionedTopicName);
+ assertEquals(subscriptions.size(), 0);
// create consumer and subscription
PulsarClient client = PulsarClient.builder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index cdeb418..a4cd325 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -159,17 +159,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
// 3) Create the partitioned topic
persistentTopics.createPartitionedTopic(testTenant, testNamespace,
testLocalTopicName, 3);
- // 4) Confirm that the topic partitions has not been created yet
- response = mock(AsyncResponse.class);
- persistentTopics.getSubscriptions(response, testTenant, testNamespace,
testLocalTopicName + "-partition-0",
- true);
- errorCaptor = ArgumentCaptor.forClass(RestException.class);
- verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
- Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
- Response.Status.NOT_FOUND.getStatusCode());
- Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic
partitions were not yet created");
-
- // 5) Create a subscription
+ // 4) Create a subscription
response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant,
testNamespace, testLocalTopicName, "test", true,
(MessageIdImpl) MessageId.earliest, false);
@@ -177,26 +167,26 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- // 6) Confirm that the subscription exists
+ // 5) Confirm that the subscription exists
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace,
testLocalTopicName + "-partition-0",
true);
verify(response,
timeout(5000).times(1)).resume(Lists.newArrayList("test"));
- // 7) Delete the subscription
+ // 6) Delete the subscription
response = mock(AsyncResponse.class);
persistentTopics.deleteSubscription(response, testTenant,
testNamespace, testLocalTopicName, "test", true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- // 8) Confirm that the subscription does not exist
+ // 7) Confirm that the subscription does not exist
response = mock(AsyncResponse.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace,
testLocalTopicName + "-partition-0",
true);
verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
- // 9) Delete the partitioned topic
+ // 8) Delete the partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.deletePartitionedTopic(response, testTenant,
testNamespace, testLocalTopicName, true, true);
responseCaptor = ArgumentCaptor.forClass(Response.class);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index e3965bd..cf5db2b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -782,9 +782,8 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
4);
- // check if the virtual topic doesn't get created
List<String> topics = admin.topics().getList("prop-xyz/use/ns1");
- assertEquals(topics.size(), 0);
+ assertEquals(topics.size(), 4);
assertEquals(
admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2").partitions,
@@ -809,6 +808,9 @@ public class V1_AdminApiTest extends
MockedPulsarServiceBaseTest {
fail(e.getMessage());
}
+ List<String> subscriptions =
admin.topics().getSubscriptions(partitionedTopicName);
+ assertEquals(subscriptions.size(), 1);
+
Consumer<byte[]> consumer1 =
client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1")
.subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
new file mode 100644
index 0000000..2647bbb
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.admin.ZkAdminPaths;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+
+public class PartitionCreationTest extends ProducerConsumerBase {
+
+ @DataProvider(name = "topicDomainProvider")
+ public Object[][] topicDomainProvider() {
+ return new Object[][] {
+ { TopicDomain.persistent },
+ { TopicDomain.non_persistent }
+ };
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setManagedLedgerCacheEvictionFrequency(0.1);
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void
testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain
domain) throws PulsarAdminException, PulsarClientException {
+ conf.setAllowAutoTopicCreation(false);
+ final String topic = domain.value() +
"://public/default/testCreateConsumerWhenDisableTopicAutoCreation";
+ admin.topics().createPartitionedTopic(topic, 3);
+
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void
testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain
domain) throws PulsarClientException {
+ conf.setAllowAutoTopicCreation(false);
+ final String topic = domain.value() +
"://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation";
+ try {
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+ if (domain == TopicDomain.persistent) {
+ Assert.fail("should be failed");
+ } else {
+ // passed non persistent topic here since we can not avoid
auto creation on non persistent topic now.
+ Assert.assertNotNull(consumer);
+ }
+ } catch (PulsarClientException.TopicDoesNotExistException e) {
+ //ok
+ }
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void
testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain
domain) throws PulsarAdminException, PulsarClientException {
+ conf.setAllowAutoTopicCreation(true);
+ final String topic = domain.value() +
"://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation";
+ admin.topics().createPartitionedTopic(topic, 3);
+
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+ }
+
+ @Test(dataProvider = "topicDomainProvider")
+ public void
testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain
domain) throws PulsarClientException {
+ conf.setAllowAutoTopicCreation(true);
+ final String topic = domain.value() +
"://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation";
+
Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe());
+ }
+
+ @Test
+ public void
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation()
throws PulsarClientException, PulsarAdminException {
+ conf.setAllowAutoTopicCreation(false);
+ final String topic =
"testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation";
+ admin.topics().createPartitionedTopic(topic, 3);
+ MultiTopicsConsumerImpl<byte[]> consumer =
(MultiTopicsConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+ Assert.assertNotNull(consumer);
+ Assert.assertEquals(consumer.getConsumers().size(), 3);
+ consumer.close();
+ admin.topics().updatePartitionedTopic(topic, 5);
+ consumer = (MultiTopicsConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+ Assert.assertNotNull(consumer);
+ Assert.assertEquals(consumer.getConsumers().size(), 5);
+ }
+
+ @Test
+ public void testCreateMissedPartitions() throws JsonProcessingException,
KeeperException, InterruptedException, PulsarAdminException,
PulsarClientException {
+ conf.setAllowAutoTopicCreation(false);
+ final String topic = "testCreateMissedPartitions";
+ String path = ZkAdminPaths.partitionedTopicPath(TopicName.get(topic));
+ int numPartitions = 3;
+ byte[] data = jsonMapper().writeValueAsBytes(new
PartitionedTopicMetadata(numPartitions));
+ // simulate partitioned topic without partitions
+
ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(),
path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Consumer<byte[]> consumer = null;
+ try {
+ consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribeAsync().get(3,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ //ok here, consumer will create failed with 'Topic does not exist'
+ }
+ Assert.assertNull(consumer);
+ admin.topics().createMissedPartitions(topic);
+ consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+ Assert.assertNotNull(consumer);
+ Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+
Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(),
3);
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 7413825..a5b20f3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -487,11 +487,11 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
- // 3. verify consumer get methods, to get 0 number of partitions and
topics.
+ // 3. verify consumer get methods, to get 5 number of partitions and
topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 0);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 0);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 5);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 5);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 2);
// 4. create producer
String messagePredicate = "my-message-" + key + "-";
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 39c007c..c5ec041 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -212,6 +212,16 @@ public interface Topics {
void createNonPartitionedTopic(String topic) throws PulsarAdminException;
/**
+ * Create missed partitions for partitioned topic.
+ * <p>
+ * When disable topic auto creation, use this method to try create missed
partitions while
+ * partitions create failed or users already have partitioned topic
without partitions.
+ *
+ * @param topic partitioned topic name
+ */
+ void createMissedPartitions(String topic) throws PulsarAdminException;
+
+ /**
* Create a partitioned topic asynchronously.
* <p>
* Create a partitioned topic asynchronously. It needs to be called before
creating a producer for a partitioned
@@ -234,6 +244,16 @@ public interface Topics {
CompletableFuture<Void> createNonPartitionedTopicAsync(String topic);
/**
+ * Create missed partitions for partitioned topic asynchronously.
+ * <p>
+ * When disable topic auto creation, use this method to try create missed
partitions while
+ * partitions create failed or users already have partitioned topic
without partitions.
+ *
+ * @param topic partitioned topic name
+ */
+ CompletableFuture<Void> createMissedPartitionsAsync(String topic);
+
+ /**
* Update number of partitions of a non-global partitioned topic.
* <p>
* It requires partitioned-topic to be already exist and number of new
partitions must be greater than existing
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3c31845..75c3d59 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -225,6 +225,20 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public void createMissedPartitions(String topic) throws
PulsarAdminException {
+ try {
+ createMissedPartitionsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Void> createNonPartitionedTopicAsync(String
topic){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn);
@@ -240,6 +254,13 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public CompletableFuture<Void> createMissedPartitionsAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "createMissedPartitions");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public void updatePartitionedTopic(String topic, int numPartitions)
throws PulsarAdminException {
try {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 06775e1..f751cc2 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -713,6 +713,8 @@ public class PulsarClientException extends IOException {
return new ChecksumException(msg);
} else if (cause instanceof CryptoException) {
return new CryptoException(msg);
+ } else if (cause instanceof TopicDoesNotExistException) {
+ return new TopicDoesNotExistException(msg);
} else {
return new PulsarClientException(t);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9a2ae1b..af5aa02 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -82,6 +82,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("expire-messages-all-subscriptions", new
ExpireMessagesForAllSubscriptions());
jcommander.addCommand("create-partitioned-topic", new
CreatePartitionedCmd());
+ jcommander.addCommand("create-missed-partitions", new
CreateMissedPartitionsCmd());
jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new
UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new
GetPartitionedTopicMetadataCmd());
@@ -214,6 +215,21 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Try to create partitions for partitioned
topic. \n"
+ + "\t\t The partitions of partition topic has to be created,
can be used by repair partitions when \n"
+ + "\t\t topic auto creation is disabled")
+ private class CreateMissedPartitionsCmd extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic\n",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ topics.createMissedPartitions(topic);
+ }
+ }
+
@Parameters(commandDescription = "Create a non-partitioned topic.")
private class CreateNonPartitionedCmd extends CliCommand {
diff --git a/site2/docs/admin-api-partitioned-topics.md
b/site2/docs/admin-api-partitioned-topics.md
index fe0ab98..a6507e0 100644
--- a/site2/docs/admin-api-partitioned-topics.md
+++ b/site2/docs/admin-api-partitioned-topics.md
@@ -62,6 +62,34 @@ int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
```
+### Create missed partitions
+
+Try to create partitions for partitioned topic. The partitions of partition
topic has to be created,
+can be used by repair partitions when topic auto creation is disabled
+
+#### pulsar-admin
+
+You can create missed partitions using the
[`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions)
+command and specifying the topic name as an argument.
+
+Here's an example:
+
+```shell
+$ bin/pulsar-admin topics create-missed-partitions \
+ persistent://my-tenant/my-namespace/my-topic \
+```
+
+#### REST API
+
+{@inject:
endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic|operation/createMissedPartitions}
+
+#### Java
+
+```java
+String topicName = "persistent://my-tenant/my-namespace/my-topic";
+admin.persistentTopics().createMissedPartitions(topicName);
+```
+
### Get metadata
Partitioned topics have metadata associated with them that you can fetch as a
JSON object.
diff --git a/site2/docs/reference-pulsar-admin.md
b/site2/docs/reference-pulsar-admin.md
index 6e9f7a3..af8c88b 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1603,6 +1603,7 @@ Subcommands
* `offload`
* `offload-status`
* `create-partitioned-topic`
+* `create-missed-partitions`
* `delete-partitioned-topic`
* `create`
* `get-partitioned-topic-metadata`
@@ -1704,6 +1705,15 @@ Options
|---|---|---|
|`-p`, `--partitions`|The number of partitions for the topic|0|
+### `create-missed-partitions`
+Try to create partitions for partitioned topic. The partitions of partition
topic has to be created,
+can be used by repair partitions when topic auto creation is disabled
+
+Usage
+```bash
+$ pulsar-admin topics create-missed-partitions
persistent://tenant/namespace/topic
+```
+
### `delete-partitioned-topic`
Delete a partitioned topic. This will also delete all the partitions of the
topic if they exist.