This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2adb666 admin-cli-support-terminate-partitioned-topic (#11893)
2adb666 is described below
commit 2adb6661d5b82c5705ee00ce3ebc9941c99635d5
Author: gaozhangmin <[email protected]>
AuthorDate: Tue Dec 21 16:56:40 2021 +0800
admin-cli-support-terminate-partitioned-topic (#11893)
Co-authored-by: gavingaozhangmin <[email protected]>
---
.../broker/admin/impl/PersistentTopicsBase.java | 13 ++++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 2 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 26 +++++++++++-
.../org/apache/pulsar/client/admin/Topics.java | 20 +++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 49 ++++++++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 7 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 18 ++++++++
site2/docs/reference-pulsar-admin.md | 8 ++++
8 files changed, 137 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 234fd5b..4397311 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -37,6 +37,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -3224,23 +3225,29 @@ public class PersistentTopicsBase extends AdminResource
{
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.TERMINATE);
- List<MessageId> messageIds = new ArrayList<>();
+ Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
PartitionedTopicMetadata partitionMetadata =
getPartitionedTopicMetadata(topicName, authoritative, false);
+
+ if (partitionMetadata.partitions == 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of
a non-partitioned topic is "
+ + "not allowed using partitioned-terminate, please use
terminate commands.");
+ }
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<MessageId>> futures =
Lists.newArrayList();
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics()
+ int finalI = i;
+ futures.add(pulsar().getAdminClient().topics()
.terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId,
throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to terminate topic {}",
clientAppId(), topicNamePartition,
throwable);
asyncResponse.resume(new
RestException(throwable));
}
- messageIds.add(messageId);
+ messageIds.put(finalI, messageId);
}));
} catch (Exception e) {
log.error("[{}] Failed to terminate topic {}", clientAppId(),
topicNamePartition, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5f78294..edd496f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2587,7 +2587,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
- @ApiResponse(code = 405, message = "Termination of a partitioned
topic is not allowed"),
+ @ApiResponse(code = 405, message = "Termination of a
non-partitioned topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global
cluster configuration")})
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6c7cb39..4e02338 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -336,7 +336,9 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
// 9) terminate partitioned topic
response = mock(AsyncResponse.class);
persistentTopics.terminatePartitionedTopic(response, testTenant,
testNamespace, testLocalTopicName, true);
- verify(response, timeout(5000).times(1)).resume(Arrays.asList(new
MessageIdImpl(3, -1, -1)));
+ Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
+ messageIds.put(0, new MessageIdImpl(3, -1, -1));
+ verify(response, timeout(5000).times(1)).resume(messageIds);
}
@Test
@@ -1126,4 +1128,24 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(e.getResponse().getStatus(), 404);
}
}
+
+ public void testAdminTerminatePartitionedTopic() throws Exception{
+ TenantInfoImpl tenantInfo = new
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("prop-xyz", tenantInfo);
+ admin.namespaces().createNamespace("prop-xyz/ns12",
Sets.newHashSet("test"));
+ final String topicName =
"persistent://prop-xyz/ns12/testTerminatePartitionedTopic";
+
+ admin.topics().createPartitionedTopic(topicName, 1);
+ Map<Integer, MessageId> results = new HashMap<>();
+ results.put(0, new MessageIdImpl(3, -1, -1));
+
Assert.assertEquals(admin.topics().terminatePartitionedTopic(topicName),
results);
+
+ // Check examine message not allowed on non-partitioned topic.
+
admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns12/test");
+ try {
+ admin.topics().terminatePartitionedTopic(topicName);
+ } catch (PulsarAdminException e) {
+ Assert.assertEquals(e.getMessage(), "Termination of a
non-partitioned topic is not allowed using partitioned-terminate, please use
terminate commands.");
+ }
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index e5bc3b4..1c072c8 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -748,6 +748,26 @@ public interface Topics {
CompletableFuture<MessageId> terminateTopicAsync(String topic);
/**
+ * Terminate the partitioned topic and prevent any more messages being
published on it.
+ * <p/>
+ *
+ * @param topic
+ * topic name
+ * @return the message id of the last message that was published in the
each partition of topic
+ */
+ Map<Integer, MessageId> terminatePartitionedTopic(String topic) throws
PulsarAdminException;
+
+ /**
+ * Terminate the partitioned topic and prevent any more messages being
published on it.
+ * <p/>
+ *
+ * @param topic
+ * topic name
+ * @return the message id of the last message that was published in the
each partition of topic
+ */
+ CompletableFuture<Map<Integer, MessageId>>
terminatePartitionedTopicAsync(String topic);
+
+ /**
* Get the list of subscriptions.
* <p/>
* Get the list of persistent subscriptions for a given topic.
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4914ed2..21138a1 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -664,6 +665,54 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
+ public Map<Integer, MessageId> terminatePartitionedTopic(String topic)
throws PulsarAdminException {
+ try {
+ return
terminatePartitionedTopicAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Map<Integer, MessageId>>
terminatePartitionedTopicAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+
+ final CompletableFuture<Map<Integer, MessageId>> future = new
CompletableFuture<>();
+ try {
+ final WebTarget path = topicPath(tn, "terminate", "partitions");
+
+ request(path).async().post(Entity.entity("",
MediaType.APPLICATION_JSON),
+ new InvocationCallback<Map<Integer, MessageIdImpl>>() {
+
+ @Override
+ public void completed(Map<Integer, MessageIdImpl>
messageId) {
+ Map<Integer, MessageId> messageIdImpl = new
HashMap<>();
+ for (Map.Entry<Integer, MessageIdImpl> entry:
messageId.entrySet()) {
+ messageIdImpl.put(entry.getKey(),
entry.getValue());
+ }
+ future.complete(messageIdImpl);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ log.warn("[{}] Failed to perform http post
request: {}", path.getUri(),
+ throwable.getMessage());
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ } catch (PulsarAdminException cae) {
+ future.completeExceptionally(cae);
+ }
+
+ return future;
+ }
+
+ @Override
public List<String> getSubscriptions(String topic) throws
PulsarAdminException {
try {
return getSubscriptionsAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9c4f11e..e1fe9ad 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Field;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -1276,6 +1277,12 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("terminate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).terminateTopicAsync("persistent://myprop/clust/ns1/ds1");
+ Map<Integer, MessageId> results = new HashMap<>();
+ results.put(0, new MessageIdImpl(1, 1, 0));
+
when(mockTopics.terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1")).thenReturn(results);
+ cmdTopics.run(split("partitioned-terminate
persistent://myprop/clust/ns1/ds1"));
+
verify(mockTopics).terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("compact persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).triggerCompaction("persistent://myprop/clust/ns1/ds1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f137dde..2f83c8c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -125,6 +126,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-message-id", new GetMessageId());
jcommander.addCommand("reset-cursor", new ResetCursor());
jcommander.addCommand("terminate", new Terminate());
+ jcommander.addCommand("partitioned-terminate", new
PartitionedTerminate());
jcommander.addCommand("compact", new Compact());
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
@@ -874,6 +876,22 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Terminate a partitioned topic and don't
allow any more messages to be published")
+ private class PartitionedTerminate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException, TimeoutException {
+ String persistentTopic = validatePersistentTopic(params);
+ Map<Integer, MessageId> messageIds =
getTopics().terminatePartitionedTopic(persistentTopic);
+ for (Map.Entry<Integer, MessageId> entry: messageIds.entrySet()) {
+ String topicName = persistentTopic + "-partition-" +
entry.getKey();
+ System.out.println("Topic " + topicName + " succesfully
terminated at " + entry.getValue());
+ }
+ }
+ }
+
@Parameters(commandDescription = "Peek some messages for the subscription")
private class PeekMessages extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
diff --git a/site2/docs/reference-pulsar-admin.md
b/site2/docs/reference-pulsar-admin.md
index 3079dc3..e9860d4 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2161,6 +2161,14 @@ Usage
$ pulsar-admin topics terminate persistent://tenant/namespace/topic
```
+### `partitioned-terminate`
+Terminate a persistent topic (disallow further messages from being published
on the topic)
+
+Usage
+```bash
+$ pulsar-admin topics partitioned-terminate persistent://tenant/namespace/topic
+```
+
### `permissions`
Get the permissions on a topic. Retrieve the effective permissions for a
destination. These permissions are defined by the permissions set at the
namespace level combined (union) with any eventual specific permissions set on
the topic.