This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 548c726 [Issue #3436][pulsar-broker] Creating REST Endpoint for
non-partitioned topic creation (#3625)
548c726 is described below
commit 548c726b8e7f0e163b1132c9ada6ba83d6bec572
Author: Richard Yu <[email protected]>
AuthorDate: Sun Mar 3 17:18:50 2019 -0800
[Issue #3436][pulsar-broker] Creating REST Endpoint for non-partitioned
topic creation (#3625)
We are adding a REST endpoint which allows the admin to create
non-partitioned topics
through PersistentTopics.
---
.../broker/admin/impl/PersistentTopicsBase.java | 12 ++++++++++
.../pulsar/broker/admin/v2/PersistentTopics.java | 16 +++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 5 +++-
.../org/apache/pulsar/client/admin/Topics.java | 19 +++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 19 +++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 +++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 14 +++++++++++
site2/docs/admin-api-partitioned-topics.md | 27 ++++++++++++++++++++++
site2/docs/reference-pulsar-admin.md | 10 +++++++-
9 files changed, 123 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index be5badb..2a117ec 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -398,6 +398,18 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
+ validateAdminAccessForTenant(topicName.getTenant());
+
+ try {
+ getOrCreateTopic(topicName);
+ log.info("[{}] Successfully created non-partitioned topic {}",
clientAppId(), topicName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to create non-partitioned topic {}",
clientAppId(), topicName, e);
+ throw new RestException(e);
+ }
+ }
+
/**
* It updates number of partitions of an existing non-global partitioned
topic. It requires partitioned-topic to
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 30cb4b3..a9c9369 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -140,6 +140,22 @@ public class PersistentTopics extends PersistentTopicsBase
{
internalCreatePartitionedTopic(numPartitions, authoritative);
}
+ @PUT
+ @Path("/{tenant}/{namespace}/{topic}")
+ @ApiOperation(value="Create a non-partitioned topic.", notes = "This is
the only REST endpoint from which non-partitioned topics could be created.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 409, message = "Partitioned topic already exist"),
+ @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or
Namespace does not have any clusters configured"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster
configuration")
+ })
+ public void createNonPartitionedTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateGlobalNamespaceOwnership(tenant,namespace);
+ internalCreateNonPartitionedTopic(authoritative);
+ }
+
/**
* It updates number of partitions of an existing non-global partitioned
topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than
existing number of partitions. Decrementing
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2d23f50..fdefd46 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -117,7 +117,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
}
@Test
- public void testGetSubscriptionsWithAutoTopicCreationDisabled() {
+ public void testNonPartitionedTopics() {
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
final String nonPartitionTopic = "non-partitioned-topic";
persistentTopics.createSubscription(testTenant, testNamespace,
nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest);
@@ -126,5 +126,8 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
} catch (RestException exc) {
Assert.assertTrue(exc.getMessage().contains("zero partitions"));
}
+ final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
+ persistentTopics.createNonPartitionedTopic(testTenant, testNamespace,
nonPartitionTopic2, true);
+ Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant,
testNamespace, nonPartitionTopic, true).partitions, 0);
}
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 9c442c7..691d0ab 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -203,6 +203,18 @@ public interface Topics {
void createPartitionedTopic(String topic, int numPartitions) throws
PulsarAdminException;
/**
+ * Create a non-partitioned topic.
+ *
+ * <p>
+ * Create a non-partitioned topic.
+ * <p>
+ *
+ * @param topic Topic name
+ * @throws PulsarAdminException
+ */
+ void createNonPartitionedTopic(String topic) throws PulsarAdminException;
+
+ /**
* Create a partitioned topic asynchronously.
* <p>
* Create a partitioned topic asynchronously. It needs to be called before
creating a producer for a partitioned
@@ -218,6 +230,13 @@ public interface Topics {
CompletableFuture<Void> createPartitionedTopicAsync(String topic, int
numPartitions);
/**
+ * Create a non-partitioned topic asynchronously.
+ *
+ * @param topic Topic name
+ */
+ CompletableFuture<Void> createNonPartitionedTopicAsync(String topic);
+
+ /**
* Update number of partitions of a non-global partitioned topic.
* <p>
* It requires partitioned-topic to be already exist and number of new
partitions must be greater than existing
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 763d221..91991c6 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -207,6 +207,25 @@ public class TopicsImpl extends BaseResource implements
Topics, PersistentTopics
}
@Override
+ public void createNonPartitionedTopic(String topic) throws
PulsarAdminException {
+ try {
+ createNonPartitionedTopicAsync(topic).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e.getCause());
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createNonPartitionedTopicAsync(String
topic){
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn);
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic,
int numPartitions) {
checkArgument(numPartitions > 1, "Number of partitions should be more
than 1");
TopicName tn = validateTopic(topic);
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d4c2cd3..e852786 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -606,6 +606,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("create-partitioned-topic
persistent://myprop/clust/ns1/ds1 --partitions 32"));
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1",
32);
+ cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1"));
verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index ceaebf4..9fdce5e 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -82,6 +82,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("expire-messages-all-subscriptions", new
ExpireMessagesForAllSubscriptions());
jcommander.addCommand("create-partitioned-topic", new
CreatePartitionedCmd());
+ jcommander.addCommand("create", new CreateNonPartitionedCmd());
jcommander.addCommand("update-partitioned-topic", new
UpdatePartitionedCmd());
jcommander.addCommand("get-partitioned-topic-metadata", new
GetPartitionedTopicMetadataCmd());
@@ -213,6 +214,19 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Create a non-partitioned topic.")
+ private class CreateNonPartitionedCmd extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic\n",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ topics.createNonPartitionedTopic(topic);
+ }
+ }
+
@Parameters(commandDescription = "Update existing non-global partitioned
topic. \n"
+ "\t\tNew updating number of partitions must be greater than
existing number of partitions.")
private class UpdatePartitionedCmd extends CliCommand {
diff --git a/site2/docs/admin-api-partitioned-topics.md
b/site2/docs/admin-api-partitioned-topics.md
index 34670bb..d86a256 100644
--- a/site2/docs/admin-api-partitioned-topics.md
+++ b/site2/docs/admin-api-partitioned-topics.md
@@ -44,6 +44,33 @@ int numPartitions = 4;
admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);
```
+## Nonpartitioned topics resources
+
+### Create
+
+Nonpartitioned topics in Pulsar must be explicitly created if
allowAutoTopicCreation or createIfMissing is disabled.
+When creating a non-partitioned topic, you need to provide a topic name.
+
+#### pulsar-admin
+
+You can create non-partitioned topics using the
[`create`](reference-pulsar-admin.md#create)
+command and specifying the topic name as an argument. This is an example
command:
+
+```shell
+$ bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic
+```
+
+#### REST API
+
+{@inject:
endpoint|PUT|admin/v2/persistent/:tenant/:namespace/:topic|operation/createNonPartitionedTopic}
+
+#### Java
+
+```java
+String topicName = "persistent://my-tenant/my-namespace/my-topic";
+admin.topics().createNonPartitionedTopic(topicName);
+```
+
### Get metadata
Partitioned topics have metadata associated with them that you can fetch as a
JSON object.
diff --git a/site2/docs/reference-pulsar-admin.md
b/site2/docs/reference-pulsar-admin.md
index 65356ae..62a8d38 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1684,6 +1684,7 @@ Subcommands
* `offload-status`
* `create-partitioned-topic`
* `delete-partitioned-topic`
+* `create`
* `get-partitioned-topic-metadata`
* `update-partitioned-topic`
* `list`
@@ -1773,7 +1774,6 @@ Options
|---|---|---|
|`-p`, `--partitions`|The number of partitions for the topic|0|
-
### `delete-partitioned-topic`
Delete a partitioned topic. This will also delete all the partitions of the
topic if they exist.
@@ -1782,6 +1782,14 @@ Usage
$ pulsar-admin topics delete-partitioned-topic {persistent|non-persistent}
```
+### `create`
+Creates a non-partitioned topic. A non-partitioned topic must explicitly be
created by the user if allowAutoTopicCreation or createIfMissing is disabled.
+
+Usage
+```bash
+$ pulsar-admin topics create
{persistent|non-persistent}://tenant/namespace/topic
+```
+
### `get-partitioned-topic-metadata`
Get the partitioned topic metadata. If the topic is not created or is a
non-partitioned topic, this will return an empty topic with zero partitions.