This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 98355ca  Added REST handler to create a subscription on a topic (#1151)
98355ca is described below

commit 98355ca745b53f7864a9aac0ae709879d5bde48e
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jan 31 09:11:48 2018 -0800

    Added REST handler to create a subscription on a topic (#1151)
---
 .../pulsar/broker/admin/PersistentTopics.java      |  87 +++++++++++++++--
 .../broker/admin/CreateSubscriptionTest.java       | 106 +++++++++++++++++++++
 .../pulsar/client/admin/PersistentTopics.java      |  46 ++++++++-
 .../admin/internal/PersistentTopicsImpl.java       |  34 +++++--
 .../pulsar/admin/cli/CmdPersistentTopics.java      |  34 ++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  19 ++--
 6 files changed, 297 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index f241cf5..da8bb0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -24,14 +24,12 @@ import static org.apache.pulsar.common.util.Codec.decode;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -66,6 +64,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -77,6 +76,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.MessageId;
@@ -106,6 +106,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.zafarkhaja.semver.Version;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -116,7 +117,6 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import com.github.zafarkhaja.semver.Version;
 
 /**
  */
@@ -615,7 +615,7 @@ public class PersistentTopics extends AdminResource {
         DestinationName dn = DestinationName.get(domain(), property, cluster, 
namespace, destination);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, 
cluster, namespace));
-        } 
+        }
         List<String> subscriptions = Lists.newArrayList();
         PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(property, cluster, namespace,
                 destination, authoritative);
@@ -656,7 +656,7 @@ public class PersistentTopics extends AdminResource {
         validateAdminAndClientPermission(dn);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, 
cluster, namespace));
-        } 
+        }
         validateDestinationOwnership(dn, authoritative);
         Topic topic = getTopicReference(dn);
         return topic.getStats();
@@ -676,7 +676,7 @@ public class PersistentTopics extends AdminResource {
         validateAdminAndClientPermission(dn);
         if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
             validateGlobalNamespaceOwnership(NamespaceName.get(property, 
cluster, namespace));
-        } 
+        }
         validateDestinationOwnership(dn, authoritative);
         Topic topic = getTopicReference(dn);
         return topic.getInternalStats();
@@ -1024,6 +1024,67 @@ public class PersistentTopics extends AdminResource {
         }
     }
 
+    @PUT
+    
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subscriptionName}")
+    @ApiOperation(value = "Reset subscription to message position closest to 
given position.", notes = "Creates a subscription on the topic at the specified 
message id")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not 
exist"),
+            @ApiResponse(code = 405, message = "Not supported for partitioned 
topics") })
+    public void createSubscription(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, 
@PathParam("destination") @Encoded String destination,
+            @PathParam("subscriptionName") String subscriptionName,
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative, MessageIdImpl messageId) throws PulsarServerException {
+        destination = decode(destination);
+        DestinationName dn = DestinationName.get(domain(), property, cluster, 
namespace, destination);
+        if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
+            validateGlobalNamespaceOwnership(NamespaceName.get(property, 
cluster, namespace));
+        }
+        log.info("[{}][{}] Creating subscription {} at message id {}", 
clientAppId(), destination,
+                subscriptionName, messageId);
+
+        PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(property, cluster, namespace,
+                destination, authoritative);
+
+        try {
+            if (partitionMetadata.partitions > 0) {
+                // Create the subscription on each partition
+                List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                PulsarAdmin admin = pulsar().getAdminClient();
+
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    
futures.add(admin.persistentTopics().createSubscriptionAsync(dn.getPartition(i).toString(),
+                            subscriptionName, messageId));
+                }
+
+                FutureUtil.waitForAll(futures).join();
+            } else {
+                validateAdminOperationOnDestination(dn, authoritative);
+
+                PersistentTopic topic = (PersistentTopic) getOrCreateTopic(dn);
+
+                if (topic.getSubscriptions().containsKey(subscriptionName)) {
+                    throw new RestException(Status.CONFLICT, "Subscription 
already exists for topic");
+                }
+
+                PersistentSubscription subscription = (PersistentSubscription) 
topic
+                        .createSubscription(subscriptionName).get();
+                
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId())).get();
+                log.info("[{}][{}] Successfully created subscription {} at 
message id {}", clientAppId(), dn,
+                        subscriptionName, messageId);
+            }
+        } catch (Exception e) {
+            Throwable t = e.getCause();
+            log.warn("[{}] [{}] Failed to create subscription {} at message id 
{}", clientAppId(), dn, subscriptionName,
+                    messageId, e);
+            if (t instanceof SubscriptionInvalidCursorPosition) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "Unable to find position for position specified: " + 
t.getMessage());
+            } else {
+                throw new RestException(e);
+            }
+        }
+    }
+
     @POST
     
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor")
     @ApiOperation(value = "Reset subscription to message position closest to 
given position.", notes = "It fence cursor and disconnects all active consumers 
before reseting cursor.")
@@ -1324,10 +1385,10 @@ public class PersistentTopics extends AdminResource {
                         dn.toString(), ex.getMessage(), ex);
                 throw ex;
             }
-            
+
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), 
dn.getCluster(),
                     dn.getNamespacePortion(), "persistent", 
dn.getEncodedLocalName());
-            
+
             // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
             // serve/redirect request else fail partitioned-metadata-request 
so, client fails while creating
             // producer/consumer
@@ -1361,6 +1422,14 @@ public class PersistentTopics extends AdminResource {
         }
     }
 
+    private Topic getOrCreateTopic(DestinationName dn) {
+        try {
+            return pulsar().getBrokerService().getTopic(dn.toString()).get();
+        } catch (InterruptedException | ExecutionException e) {
+           throw new RestException(e);
+        }
+    }
+
     /**
      * Get the Subscription object reference from the Topic reference
      */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
new file mode 100644
index 0000000..6f759d8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+public class CreateSubscriptionTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void createSubscriptionSingleTopic() throws Exception {
+        String topic = "persistent://prop-xyz/use/ns1/my-topic";
+        admin.persistentTopics().createSubscription(topic, "sub-1", 
MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.persistentTopics().createSubscription(topic, "sub-1", 
MessageId.latest);
+            fail("Should have failed");
+        } catch (ConflictException e) {
+            assertEquals(((ClientErrorException) 
e.getCause()).getResponse().getStatus(),
+                    Status.CONFLICT.getStatusCode());
+        }
+
+        assertEquals(admin.persistentTopics().getSubscriptions(topic), 
Lists.newArrayList("sub-1"));
+
+        Producer p1 = pulsarClient.createProducer(topic);
+        p1.send("test-1".getBytes());
+        p1.send("test-2".getBytes());
+        MessageId m3 = p1.send("test-3".getBytes());
+
+        
assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-1").msgBacklog,
 3);
+
+        admin.persistentTopics().createSubscription(topic, "sub-2", 
MessageId.latest);
+        
assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-2").msgBacklog,
 0);
+
+        admin.persistentTopics().createSubscription(topic, "sub-3", 
MessageId.earliest);
+        
assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-3").msgBacklog,
 3);
+
+        admin.persistentTopics().createSubscription(topic, "sub-5", m3);
+        
assertEquals(admin.persistentTopics().getStats(topic).subscriptions.get("sub-5").msgBacklog,
 1);
+    }
+
+    @Test
+    public void createSubscriptionOnPartitionedTopic() throws Exception {
+        String topic = "persistent://prop-xyz/use/ns1/my-partitioned-topic";
+        admin.persistentTopics().createPartitionedTopic(topic, 10);
+
+        admin.persistentTopics().createSubscription(topic, "sub-1", 
MessageId.latest);
+
+        // Create should fail if the subscription already exists
+        try {
+            admin.persistentTopics().createSubscription(topic, "sub-1", 
MessageId.latest);
+            fail("Should have failed");
+        } catch (Exception e) {
+            // Expected
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(
+                    
admin.persistentTopics().getSubscriptions(DestinationName.get(topic).getPartition(i).toString()),
+                    Lists.newArrayList("sub-1"));
+        }
+    }
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index 093ae3e..70642fe 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -196,7 +196,7 @@ public interface PersistentTopics {
      * @return a future that can be used to track when the partitioned topic 
is created
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String destination, 
int numPartitions);
-    
+
     /**
      * Update number of partitions of a non-global partitioned topic.
      * <p>
@@ -208,7 +208,7 @@ public interface PersistentTopics {
      *            Destination name
      * @param numPartitions
      *            Number of new partitions of already exist partitioned-topic
-     * 
+     *
      * @return a future that can be used to track when the partitioned topic 
is updated
      */
     void updatePartitionedTopic(String destination, int numPartitions) throws 
PulsarAdminException;
@@ -224,7 +224,7 @@ public interface PersistentTopics {
      *            Destination name
      * @param numPartitions
      *            Number of new partitions of already exist partitioned-topic
-     * 
+     *
      * @return a future that can be used to track when the partitioned topic 
is updated
      */
     CompletableFuture<Void> updatePartitionedTopicAsync(String destination, 
int numPartitions);
@@ -311,7 +311,7 @@ public interface PersistentTopics {
      * @return a future that can be used to track when the topic is deleted
      */
     CompletableFuture<Void> deleteAsync(String destination);
-    
+
     /**
      * Unload a topic.
      * <p>
@@ -798,6 +798,42 @@ public interface PersistentTopics {
     CompletableFuture<List<Message>> peekMessagesAsync(String destination, 
String subName, int numMessages);
 
     /**
+     * Create a new subscription on a topic
+     *
+     * @param destination
+     *            Destination name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the 
subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws ConflictException
+     *             Subscription already exists
+     * @throws NotAllowedException
+     *             Command disallowed for requested resource
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createSubscription(String destination, String subscriptionName, 
MessageId messageId)
+            throws PulsarAdminException;
+
+    /**
+     * Create a new subscription on a topic
+     *
+     * @param destination
+     *            Destination name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            The {@link MessageId} on where to initialize the 
subscription. It could be {@link MessageId#latest},
+     *            {@link MessageId#earliest} or a specific message id.
+     */
+    CompletableFuture<Void> createSubscriptionAsync(String destination, String 
subscriptionName, MessageId messageId);
+
+    /**
      * Reset cursor position on a topic subscription
      *
      * @param destination
@@ -829,7 +865,7 @@ public interface PersistentTopics {
      *            reset subscription to position closest to time in ms since 
epoch
      */
     CompletableFuture<Void> resetCursorAsync(String destination, String 
subName, long timestamp);
-    
+
     /**
      * Reset cursor position on a topic subscription
      *
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index cc1d96c..01ede00 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -181,7 +181,7 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
                 
persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("partitions"),
                 Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }
-    
+
     @Override
     public PartitionedTopicMetadata getPartitionedTopicMetadata(String 
destination) throws PulsarAdminException {
         try {
@@ -586,8 +586,8 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
         peekMessagesAsync(destination, subName, numMessages, 
Lists.newArrayList(), future, 1);
         return future;
     }
-    
-    
+
+
     private void peekMessagesAsync(String destination, String subName, int 
numMessages,
             List<Message> messages, CompletableFuture<List<Message>> future, 
int nthMessage) {
         if (numMessages <= 0) {
@@ -598,8 +598,8 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
         // if peeking first message succeeds, we know that the topic and 
subscription exists
         peekNthMessage(destination, subName, nthMessage).handle((r, ex) -> {
             if (ex != null) {
-                // if we get a not found exception, it means that the position 
for the message we are trying to get        
-                // does not exist. At this point, we can return the already 
found messages.       
+                // if we get a not found exception, it means that the position 
for the message we are trying to get
+                // does not exist. At this point, we can return the already 
found messages.
                 if (ex instanceof NotFoundException) {
                     log.warn("Exception '{}' occured while trying to peek 
Messages.", ex.getMessage());
                     future.complete(messages);
@@ -617,6 +617,28 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
     }
 
     @Override
+    public void createSubscription(String destination, String 
subscriptionName, MessageId messageId)
+            throws PulsarAdminException {
+        try {
+            DestinationName ds = validateTopic(destination);
+            String encodedSubName = Codec.encode(subscriptionName);
+            
request(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName()).path("subscription")
+                    .path(encodedSubName)).put(Entity.entity(messageId, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createSubscriptionAsync(String destination, 
String subscriptionName,
+            MessageId messageId) {
+        DestinationName ds = validateTopic(destination);
+        String encodedSubName = Codec.encode(subscriptionName);
+        return 
asyncPutRequest(persistentTopics.path(ds.getNamespace()).path(ds.getEncodedLocalName())
+                .path("subscription").path(encodedSubName), 
Entity.entity(messageId, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void resetCursor(String destination, String subName, long 
timestamp) throws PulsarAdminException {
         try {
             DestinationName ds = validateTopic(destination);
@@ -769,7 +791,7 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
                 log.error("Exception occured while trying to get BatchMsgId: 
{}", batchMsgId, ex);
             }
             buf.release();
-            singleMessageMetadataBuilder.recycle();          
+            singleMessageMetadataBuilder.recycle();
         }
         return ret;
     }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index d7f7d51..5027e69 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -62,6 +62,7 @@ public class CmdPersistentTopics extends CmdBase {
         jcommander.addCommand("unload", new UnloadCmd());
         jcommander.addCommand("subscriptions", new ListSubscriptions());
         jcommander.addCommand("unsubscribe", new DeleteSubscription());
+        jcommander.addCommand("create-subscription", new CreateSubscription());
         jcommander.addCommand("stats", new GetStats());
         jcommander.addCommand("stats-internal", new GetInternalStats());
         jcommander.addCommand("info-internal", new GetInternalInfo());
@@ -418,6 +419,35 @@ public class CmdPersistentTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Create a new subscription on a topic")
+    private class CreateSubscription extends CliCommand {
+        @Parameter(description = 
"persistent://property/cluster/namespace/destination", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-s",
+                "--subscription" }, description = "Subscription to reset 
position on", required = true)
+        private String subscriptionName;
+
+        @Parameter(names = { "--messageId",
+                "-m" }, description = "messageId where to create the 
subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)", 
required = false)
+        private String messageIdStr = "latest";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            MessageId messageId;
+            if (messageIdStr.equals("latest")) {
+                messageId = MessageId.latest;
+            } else if (messageIdStr.equals("earliest")) {
+                messageId = MessageId.earliest;
+            } else {
+                messageId = validateMessageIdString(messageIdStr);
+            }
+
+            persistentTopics.createSubscription(persistentTopic, 
subscriptionName, messageId);
+        }
+    }
+
     @Parameters(commandDescription = "Reset position for subscription to 
position closest to timestamp or messageId")
     private class ResetCursor extends CliCommand {
         @Parameter(description = 
"persistent://property/cluster/namespace/destination", required = true)
@@ -430,7 +460,7 @@ public class CmdPersistentTopics extends CmdBase {
         @Parameter(names = { "--time",
                 "-t" }, description = "time in minutes to reset back to (or 
minutes, hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = false)
         private String resetTimeStr;
-        
+
         @Parameter(names = { "--messageId",
                 "-m" }, description = "messageId to reset back to 
(ledgerId:entryId)", required = false)
         private String resetMessageIdStr;
@@ -534,7 +564,7 @@ public class CmdPersistentTopics extends CmdBase {
             return Integer.parseInt(s);
         }
     }
-    
+
     private MessageId validateMessageIdString(String resetMessageIdStr) throws 
PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 411ef7e..00f18a9 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.client.admin.PersistentTopics;
 import org.apache.pulsar.client.admin.Properties;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ResourceQuotas;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
@@ -96,7 +98,7 @@ public class PulsarAdminToolTest {
         brokerStats.run(split("monitoring-metrics"));
         verify(mockBrokerStats).getMetrics();
     }
-    
+
     @Test
     void getOwnedNamespaces() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
@@ -132,7 +134,7 @@ public class PulsarAdminToolTest {
 
         clusters.run(split("delete use"));
         verify(mockClusters).deleteCluster("use");
-        
+
         clusters.run(split("list-failure-domains use"));
         verify(mockClusters).getFailureDomains("use");
 
@@ -303,19 +305,19 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
-        
+
         namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g 
group"));
         
verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", 
"group");
 
         namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
         
verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
-        
+
         namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster 
-g group"));
         verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", 
"group");
 
         namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
         
verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
-        
+
 
         namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
         verify(mockNamespaces).setRetention("myprop/clust/ns1", new 
RetentionPolicies(60, 1));
@@ -444,6 +446,9 @@ public class PulsarAdminToolTest {
         topics.run(split("expire-messages-all-subscriptions 
persistent://myprop/clust/ns1/ds1 -t 100"));
         
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1",
 100);
 
+        topics.run(split("create-subscription 
persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest"));
+        
verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", 
"sub1", MessageId.earliest);
+
         topics.run(split("create-partitioned-topic 
persistent://myprop/clust/ns1/ds1 --partitions 32"));
         
verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 
32);
 
@@ -494,10 +499,10 @@ public class PulsarAdminToolTest {
 
         topics.run(split("create-partitioned-topic 
non-persistent://myprop/clust/ns1/ds1 --partitions 32"));
         
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1",
 32);
-        
+
         topics.run(split("list myprop/clust/ns1"));
         verify(mockTopics).getList("myprop/clust/ns1");
-        
+
         topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 
0x23d70a30_0x26666658"));
         verify(mockTopics).getListInBundle("myprop/clust/ns1", 
"0x23d70a30_0x26666658");
 

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to