This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2adb666  admin-cli-support-terminate-partitioned-topic (#11893)
2adb666 is described below

commit 2adb6661d5b82c5705ee00ce3ebc9941c99635d5
Author: gaozhangmin <[email protected]>
AuthorDate: Tue Dec 21 16:56:40 2021 +0800

    admin-cli-support-terminate-partitioned-topic (#11893)
    
    Co-authored-by: gavingaozhangmin <[email protected]>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 13 ++++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 26 +++++++++++-
 .../org/apache/pulsar/client/admin/Topics.java     | 20 +++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 49 ++++++++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  7 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 18 ++++++++
 site2/docs/reference-pulsar-admin.md               |  8 ++++
 8 files changed, 137 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 234fd5b..4397311 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -37,6 +37,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -3224,23 +3225,29 @@ public class PersistentTopicsBase extends AdminResource 
{
         validateTopicOwnership(topicName, authoritative);
         validateTopicOperation(topicName, TopicOperation.TERMINATE);
 
-      List<MessageId> messageIds = new ArrayList<>();
+      Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
 
       PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
+
+        if (partitionMetadata.partitions == 0) {
+            throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of 
a non-partitioned topic is "
+                    + "not allowed using partitioned-terminate, please use 
terminate commands.");
+        }
         if (partitionMetadata.partitions > 0) {
           final List<CompletableFuture<MessageId>> futures = 
Lists.newArrayList();
 
           for (int i = 0; i < partitionMetadata.partitions; i++) {
             TopicName topicNamePartition = topicName.getPartition(i);
             try {
-              futures.add(pulsar().getAdminClient().topics()
+                int finalI = i;
+                futures.add(pulsar().getAdminClient().topics()
                   
.terminateTopicAsync(topicNamePartition.toString()).whenComplete((messageId, 
throwable) -> {
                           if (throwable != null) {
                               log.error("[{}] Failed to terminate topic {}", 
clientAppId(), topicNamePartition,
                                       throwable);
                               asyncResponse.resume(new 
RestException(throwable));
                           }
-                          messageIds.add(messageId);
+                          messageIds.put(finalI, messageId);
                       }));
             } catch (Exception e) {
               log.error("[{}] Failed to terminate topic {}", clientAppId(), 
topicNamePartition, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5f78294..edd496f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2587,7 +2587,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                     + "subscriber is not authorized to access this operation"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
-            @ApiResponse(code = 405, message = "Termination of a partitioned 
topic is not allowed"),
+            @ApiResponse(code = 405, message = "Termination of a 
non-partitioned topic is not allowed"),
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6c7cb39..4e02338 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -33,7 +33,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -336,7 +336,9 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         // 9) terminate partitioned topic
         response = mock(AsyncResponse.class);
         persistentTopics.terminatePartitionedTopic(response, testTenant, 
testNamespace, testLocalTopicName, true);
-        verify(response, timeout(5000).times(1)).resume(Arrays.asList(new 
MessageIdImpl(3, -1, -1)));
+        Map<Integer, MessageId> messageIds = new ConcurrentHashMap<>();
+        messageIds.put(0, new MessageIdImpl(3, -1, -1));
+        verify(response, timeout(5000).times(1)).resume(messageIds);
     }
 
     @Test
@@ -1126,4 +1128,24 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(e.getResponse().getStatus(), 404);
         }
     }
+
+    public void testAdminTerminatePartitionedTopic() throws Exception{
+        TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns12", 
Sets.newHashSet("test"));
+        final String topicName = 
"persistent://prop-xyz/ns12/testTerminatePartitionedTopic";
+
+        admin.topics().createPartitionedTopic(topicName, 1);
+        Map<Integer, MessageId> results = new HashMap<>();
+        results.put(0, new MessageIdImpl(3, -1, -1));
+        
Assert.assertEquals(admin.topics().terminatePartitionedTopic(topicName),  
results);
+
+        // Check examine message not allowed on non-partitioned topic.
+        
admin.topics().createNonPartitionedTopic("persistent://prop-xyz/ns12/test");
+        try {
+            admin.topics().terminatePartitionedTopic(topicName);
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getMessage(), "Termination of a 
non-partitioned topic is not allowed using partitioned-terminate, please use 
terminate commands.");
+        }
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index e5bc3b4..1c072c8 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -748,6 +748,26 @@ public interface Topics {
     CompletableFuture<MessageId> terminateTopicAsync(String topic);
 
     /**
+     * Terminate the partitioned topic and prevent any more messages being 
published on it.
+     * <p/>
+     *
+     * @param topic
+     *            topic name
+     * @return the message id of the last message that was published in the 
each partition of topic
+     */
+    Map<Integer, MessageId> terminatePartitionedTopic(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Terminate the partitioned topic and prevent any more messages being 
published on it.
+     * <p/>
+     *
+     * @param topic
+     *            topic name
+     * @return the message id of the last message that was published in the 
each partition of topic
+     */
+    CompletableFuture<Map<Integer, MessageId>> 
terminatePartitionedTopicAsync(String topic);
+
+    /**
      * Get the list of subscriptions.
      * <p/>
      * Get the list of persistent subscriptions for a given topic.
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 4914ed2..21138a1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -664,6 +665,54 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public Map<Integer, MessageId> terminatePartitionedTopic(String topic) 
throws PulsarAdminException {
+        try {
+            return 
terminatePartitionedTopicAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Map<Integer, MessageId>> 
terminatePartitionedTopicAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+
+        final CompletableFuture<Map<Integer, MessageId>> future = new 
CompletableFuture<>();
+        try {
+            final WebTarget path = topicPath(tn, "terminate", "partitions");
+
+            request(path).async().post(Entity.entity("", 
MediaType.APPLICATION_JSON),
+                    new InvocationCallback<Map<Integer, MessageIdImpl>>() {
+
+                        @Override
+                        public void completed(Map<Integer, MessageIdImpl> 
messageId) {
+                            Map<Integer, MessageId> messageIdImpl = new 
HashMap<>();
+                            for (Map.Entry<Integer, MessageIdImpl> entry: 
messageId.entrySet()) {
+                                messageIdImpl.put(entry.getKey(), 
entry.getValue());
+                            }
+                            future.complete(messageIdImpl);
+                        }
+
+                        @Override
+                        public void failed(Throwable throwable) {
+                            log.warn("[{}] Failed to perform http post 
request: {}", path.getUri(),
+                                    throwable.getMessage());
+                            
future.completeExceptionally(getApiException(throwable.getCause()));
+                        }
+                    });
+        } catch (PulsarAdminException cae) {
+            future.completeExceptionally(cae);
+        }
+
+        return future;
+    }
+
+    @Override
     public List<String> getSubscriptions(String topic) throws 
PulsarAdminException {
         try {
             return getSubscriptionsAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9c4f11e..e1fe9ad 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Field;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -1276,6 +1277,12 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("terminate persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).terminateTopicAsync("persistent://myprop/clust/ns1/ds1");
 
+        Map<Integer, MessageId> results = new HashMap<>();
+        results.put(0, new MessageIdImpl(1, 1, 0));
+        
when(mockTopics.terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1")).thenReturn(results);
+        cmdTopics.run(split("partitioned-terminate 
persistent://myprop/clust/ns1/ds1"));
+        
verify(mockTopics).terminatePartitionedTopic("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("compact persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).triggerCompaction("persistent://myprop/clust/ns1/ds1");
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index f137dde..2f83c8c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -125,6 +126,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-message-id", new GetMessageId());
         jcommander.addCommand("reset-cursor", new ResetCursor());
         jcommander.addCommand("terminate", new Terminate());
+        jcommander.addCommand("partitioned-terminate", new 
PartitionedTerminate());
         jcommander.addCommand("compact", new Compact());
         jcommander.addCommand("compaction-status", new CompactionStatusCmd());
         jcommander.addCommand("offload", new Offload());
@@ -874,6 +876,22 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Terminate a partitioned topic and don't 
allow any more messages to be published")
+    private class PartitionedTerminate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException, TimeoutException {
+            String persistentTopic = validatePersistentTopic(params);
+            Map<Integer, MessageId> messageIds = 
getTopics().terminatePartitionedTopic(persistentTopic);
+            for (Map.Entry<Integer, MessageId> entry: messageIds.entrySet()) {
+                String topicName = persistentTopic + "-partition-" + 
entry.getKey();
+                System.out.println("Topic " + topicName +  " succesfully 
terminated at " + entry.getValue());
+            }
+        }
+    }
+
     @Parameters(commandDescription = "Peek some messages for the subscription")
     private class PeekMessages extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 3079dc3..e9860d4 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -2161,6 +2161,14 @@ Usage
 $ pulsar-admin topics terminate persistent://tenant/namespace/topic
 ```
 
+### `partitioned-terminate`
+Terminate a persistent topic (disallow further messages from being published 
on the topic)
+
+Usage
+```bash
+$ pulsar-admin topics partitioned-terminate persistent://tenant/namespace/topic
+```
+
 ### `permissions`
 Get the permissions on a topic. Retrieve the effective permissions for a 
destination. These permissions are defined by the permissions set at the 
namespace level combined (union) with any eventual specific permissions set on 
the topic.
 

Reply via email to