This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 00459dbc02479dcc42c17a27862884b6ee158c62
Author: Marvin Cai <[email protected]>
AuthorDate: Tue Feb 9 16:16:29 2021 -0800

    Expire message by position. (#9514)
    
    Fixes #2736
    
    Add ability to expire message for subscription by position to admin client 
and admin rest API.
    
    Update PersistentMessageExpiryMonitor to able to expire message by position 
and expose to admin client and admin rest api.
    
    This change added tests and can be verified as follows:
     - Added unit test for expire message by position in 
PersistentMessageFinderTest.
     - Added test for expire message by position in admin client/admin rest api.
    
    (cherry picked from commit 301d42ce04ceb20797cc2a60267e6ec39ff05e95)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 253 ++++++++++++++-------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  41 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  42 +++-
 .../apache/pulsar/broker/service/Subscription.java |   2 +
 .../nonpersistent/NonPersistentSubscription.java   |   6 +
 .../persistent/PersistentMessageExpiryMonitor.java |  27 +++
 .../service/persistent/PersistentReplicator.java   |  12 +
 .../service/persistent/PersistentSubscription.java |   5 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  95 ++++++--
 .../pulsar/broker/service/BrokerServiceTest.java   |   1 +
 .../service/PersistentMessageFinderTest.java       |  86 ++++++-
 .../api/AuthorizationProducerConsumerTest.java     |   2 +
 .../org/apache/pulsar/client/admin/Topics.java     |  34 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   |  27 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   5 +
 .../org/apache/pulsar/admin/cli/CliCommand.java    |   6 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  31 ++-
 .../apache/pulsar/client/impl/ResetCursorData.java |   2 +
 18 files changed, 568 insertions(+), 109 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d957797..4c4db2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1717,7 +1717,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getReplicators().forEach((subName, replicator) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -1725,7 +1725,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         topic.getSubscriptions().forEach((subName, subscriber) -> {
             try {
-                internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
             } catch (Throwable t) {
                 exception.set(t);
             }
@@ -2057,77 +2057,9 @@ public class PersistentTopicsBase extends AdminResource {
                     return;
                 }
                 CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
-                if (batchIndex >= 0) {
-                    try {
-                        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
topic.getManagedLedger();
-                        ledger.asyncReadEntry(new 
PositionImpl(messageId.getLedgerId(), messageId.getEntryId()), new 
AsyncCallbacks.ReadEntryCallback() {
-                            @Override
-                            public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
-                                // Since we can't read the message from the 
storage layer, it might be an already delete message ID or an invalid message ID
-                                // We should fall back to non batch index seek.
-                                batchSizeFuture.complete(0);
-                            }
-
-                            @Override
-                            public void readEntryComplete(Entry entry, Object 
ctx) {
-                                try {
-                                    try {
-                                        if (entry == null) {
-                                            batchSizeFuture.complete(0);
-                                        } else {
-                                            MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
-                                            
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
-                                        }
-                                    } catch (Exception e) {
-                                        
batchSizeFuture.completeExceptionally(new RestException(e));
-                                    }
-                                } finally {
-                                    if (entry != null) {
-                                        entry.release();
-                                    }
-                                }
-                            }
-                        }, null);
-                    } catch (NullPointerException npe) {
-                        batchSizeFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Message not found"));
-                    } catch (Exception exception) {
-                        log.error("[{}] Failed to get message with ledgerId {} 
entryId {} from {}",
-                                clientAppId(), messageId.getLedgerId(), 
messageId.getEntryId(), topicName, exception);
-                        batchSizeFuture.completeExceptionally(new 
RestException(exception));
-                    }
-                } else {
-                    batchSizeFuture.complete(0);
-                }
+                getEntryBatchSize(batchSizeFuture, topic, messageId, 
batchIndex);
                 batchSizeFuture.thenAccept(bi -> {
-                    PositionImpl seekPosition;
-                    if (bi > 0) {
-                        long[] ackSet;
-                        BitSetRecyclable bitSet = BitSetRecyclable.create();
-                        bitSet.set(0, bi);
-                        if (isExcluded) {
-                            bitSet.clear(0, Math.max(batchIndex + 1, 0));
-                            if (bitSet.length() > 0) {
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = 
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = 
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                                seekPosition = seekPosition.getNext();
-                            }
-                        } else {
-                            if (batchIndex - 1 >= 0) {
-                                bitSet.clear(0, batchIndex);
-                                ackSet = bitSet.toLongArray();
-                                seekPosition = 
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId(), ackSet);
-                            } else {
-                                seekPosition = 
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                            }
-                        }
-                        bitSet.recycle();
-                    } else {
-                        seekPosition = 
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId());
-                        seekPosition = isExcluded ? seekPosition.getNext() : 
seekPosition;
-                    }
-
+                    PositionImpl seekPosition = 
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
                     sub.resetCursor(seekPosition).thenRun(() -> {
                         log.info("[{}][{}] successfully reset cursor on 
subscription {} to position {}", clientAppId(),
                                 topicName, subName, messageId);
@@ -2159,6 +2091,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, 
PersistentTopic topic,
+                                   MessageIdImpl messageId, int batchIndex) {
+        if (batchIndex >= 0) {
+            try {
+                ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
topic.getManagedLedger();
+                ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
+                        messageId.getEntryId()), new 
AsyncCallbacks.ReadEntryCallback() {
+                    @Override
+                    public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+                        // Since we can't read the message from the storage 
layer,
+                        // it might be an already delete message ID or an 
invalid message ID
+                        // We should fall back to non batch index seek.
+                        batchSizeFuture.complete(0);
+                    }
+
+                    @Override
+                    public void readEntryComplete(Entry entry, Object ctx) {
+                        try {
+                            try {
+                                if (entry == null) {
+                                    batchSizeFuture.complete(0);
+                                } else {
+                                    MessageMetadata metadata =
+                                            
Commands.parseMessageMetadata(entry.getDataBuffer());
+                                    
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
+                                }
+                            } catch (Exception e) {
+                                batchSizeFuture.completeExceptionally(new 
RestException(e));
+                            }
+                        } finally {
+                            if (entry != null) {
+                                entry.release();
+                            }
+                        }
+                    }
+                }, null);
+            } catch (NullPointerException npe) {
+                batchSizeFuture.completeExceptionally(new 
RestException(Status.NOT_FOUND, "Message not found"));
+            } catch (Exception exception) {
+                log.error("[{}] Failed to get message with ledgerId {} entryId 
{} from {}",
+                        clientAppId(), messageId.getLedgerId(), 
messageId.getEntryId(), topicName, exception);
+                batchSizeFuture.completeExceptionally(new 
RestException(exception));
+            }
+        } else {
+            batchSizeFuture.complete(0);
+        }
+    }
+
+    private PositionImpl calculatePositionAckSet(boolean isExcluded, int 
batchSize,
+                                                 int batchIndex, MessageIdImpl 
messageId) {
+        PositionImpl seekPosition;
+        if (batchSize > 0) {
+            long[] ackSet;
+            BitSetRecyclable bitSet = BitSetRecyclable.create();
+            bitSet.set(0, batchSize);
+            if (isExcluded) {
+                bitSet.clear(0, Math.max(batchIndex + 1, 0));
+                if (bitSet.length() > 0) {
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId());
+                    seekPosition = seekPosition.getNext();
+                }
+            } else {
+                if (batchIndex - 1 >= 0) {
+                    bitSet.clear(0, batchIndex);
+                    ackSet = bitSet.toLongArray();
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(),
+                            messageId.getEntryId(), ackSet);
+                } else {
+                    seekPosition = PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId());
+                }
+            }
+            bitSet.recycle();
+        } else {
+            seekPosition = PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId());
+            seekPosition = isExcluded ? seekPosition.getNext() : seekPosition;
+        }
+        return seekPosition;
+    }
+
     protected void internalGetMessageById(AsyncResponse asyncResponse, long 
ledgerId, long entryId,
                                               boolean authoritative) {
         try {
@@ -2743,16 +2758,15 @@ public class PersistentTopicsBase extends AdminResource 
{
         }
     }
 
-
-    protected void internalExpireMessages(AsyncResponse asyncResponse, String 
subName, int expireTimeInSeconds,
-            boolean authoritative) {
+    protected void internalExpireMessagesByTimestamp(AsyncResponse 
asyncResponse, String subName,
+                                                     int expireTimeInSeconds, 
boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             try {
-                internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+                internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
             } catch (WebApplicationException wae) {
                 asyncResponse.resume(wae);
                 return;
@@ -2799,7 +2813,7 @@ public class PersistentTopicsBase extends AdminResource {
                 });
             } else {
                 try {
-                    internalExpireMessagesForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
+                    
internalExpireMessagesByTimestampForSinglePartition(subName, 
expireTimeInSeconds, authoritative);
                 } catch (WebApplicationException wae) {
                     asyncResponse.resume(wae);
                     return;
@@ -2812,7 +2826,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void internalExpireMessagesForSinglePartition(String subName, int 
expireTimeInSeconds,
+    private void internalExpireMessagesByTimestampForSinglePartition(String 
subName, int expireTimeInSeconds,
             boolean authoritative) {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
@@ -2856,6 +2870,89 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected void internalExpireMessagesByPosition(AsyncResponse 
asyncResponse, String subName, boolean authoritative,
+                                                 MessageIdImpl messageId, 
boolean isExcluded, int batchIndex) {
+        if (topicName.isGlobal()) {
+            try {
+                validateGlobalNamespaceOwnership(namespaceName);
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages on subscription 
{} to position {}: {}", clientAppId(),
+                        topicName, subName, messageId, e.getMessage());
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+        }
+
+        log.info("[{}][{}] received expire messages on subscription {} to 
position {}", clientAppId(), topicName,
+                subName, messageId);
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
+            log.warn("[{}] Not supported operation expire message up to {} on 
partitioned-topic {} {}",
+                    clientAppId(), messageId, topicName, subName);
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Expire message at position is not supported for 
partitioned-topic"));
+            return;
+        } else if (messageId.getPartitionIndex() != 
topicName.getPartitionIndex()) {
+            log.warn("[{}] Invalid parameter for expire message by position, 
partition index of passed in message"
+                            + " position {} doesn't match partition index of 
topic requested {}.",
+                    clientAppId(), messageId, topicName);
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
+                    "Invalid parameter for expire message by position, 
partition index of message position "
+                            + "passed in doesn't match partition index for the 
topic."));
+        } else {
+            validateAdminAccessForSubscriber(subName, authoritative);
+            validateReadOperationOnTopic(authoritative);
+            PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
+            if (topic == null) {
+                asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
+                return;
+            }
+            try {
+                PersistentSubscription sub = topic.getSubscription(subName);
+                if (sub == null) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Subscription not found"));
+                    return;
+                }
+                CompletableFuture<Integer> batchSizeFuture = new 
CompletableFuture<>();
+                getEntryBatchSize(batchSizeFuture, topic, messageId, 
batchIndex);
+                batchSizeFuture.thenAccept(bi -> {
+                    PositionImpl position = 
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
+                    try {
+                        if (subName.startsWith(topic.getReplicatorPrefix())) {
+                            String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
+                            PersistentReplicator repl = (PersistentReplicator)
+                                    
topic.getPersistentReplicator(remoteCluster);
+                            checkNotNull(repl);
+                            repl.expireMessages(position);
+                        } else {
+                            checkNotNull(sub);
+                            sub.expireMessages(position);
+                        }
+                        log.info("[{}] Message expire issued up to {} on {} 
{}", clientAppId(), position, topicName,
+                                subName);
+                    } catch (NullPointerException npe) {
+                        throw new RestException(Status.NOT_FOUND, 
"Subscription not found");
+                    } catch (Exception exception) {
+                        log.error("[{}] Failed to expire messages up to {} on 
{} with subscription {} {}",
+                                clientAppId(), position, topicName, subName, 
exception);
+                        throw new RestException(exception);
+                    }
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to expire messages up to {} on {} 
with subscription {} {}", clientAppId(),
+                            messageId, topicName, subName, e);
+                    asyncResponse.resume(e);
+                    return null;
+                });
+            } catch (Exception e) {
+                log.warn("[{}][{}] Failed to expire messages up to {} on 
subscription {} to position {}",
+                        clientAppId(), topicName, messageId, subName, 
messageId, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+            }
+        }
+        asyncResponse.resume(Response.noContent().build());
+    }
+
     protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {
         log.info("[{}] Trigger compaction on topic {}", clientAppId(), 
topicName);
         try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index d33cac0..1ba1467 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -446,7 +446,46 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), 
expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, 
decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not 
exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a 
non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to 
(ledgerId:entryId)")
+                    ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, 
decode(encodedSubName), authoritative,
+                    new MessageIdImpl(resetCursorData.getLedgerId(),
+                            resetCursorData.getEntryId(), 
resetCursorData.getPartitionIndex())
+                    , resetCursorData.isExcluded(), 
resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ae3d734..592cfa8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1148,7 +1148,47 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
-            internalExpireMessages(asyncResponse, decode(encodedSubName), 
expireTimeInSeconds, authoritative);
+            internalExpireMessagesByTimestamp(asyncResponse, 
decode(encodedSubName),
+                    expireTimeInSeconds, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
+    @ApiOperation(value = "Expiry messages on a topic subscription.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic or subscription does not 
exist"),
+            @ApiResponse(code = 405, message = "Expiry messages on a 
non-persistent topic is not allowed"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")})
+    public void expireTopicMessages(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription to be Expiry messages on")
+            @PathParam("subName") String encodedSubName,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @ApiParam(name = "messageId", value = "messageId to reset back to 
(ledgerId:entryId)")
+            ResetCursorData resetCursorData) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalExpireMessagesByPosition(asyncResponse, 
decode(encodedSubName), authoritative,
+            new MessageIdImpl(resetCursorData.getLedgerId(),
+                    resetCursorData.getEntryId(), 
resetCursorData.getPartitionIndex())
+            , resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 1c05399..0cd5585 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -86,6 +86,8 @@ public interface Subscription {
 
     void expireMessages(int messageTTLInSeconds);
 
+    void expireMessages(Position position);
+
     void redeliverUnacknowledgedMessages(Consumer consumer);
 
     void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> 
positions);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 635ce3e..bb1cb27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -434,6 +434,12 @@ public class NonPersistentSubscription implements 
Subscription {
         // No-op
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        throw new UnsupportedOperationException("Expire message by position is 
not supported for"
+                + " non-persistent topic.");
+    }
+
     public NonPersistentSubscriptionStats getStats() {
         NonPersistentSubscriptionStats subStats = new 
NonPersistentSubscriptionStats();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 502dd2f..df0b84c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -28,6 +28,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedger
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.stats.Rate;
@@ -92,6 +93,32 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
         }
     }
 
+    public void expireMessages(Position messagePosition) {
+        // If it's beyond last position of this topic, do nothing.
+        if (((PositionImpl) 
subscription.getTopic().getLastPosition()).compareTo((PositionImpl) 
messagePosition) < 0) {
+            return;
+        }
+        if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
+            log.info("[{}][{}] Starting message expiry check, position= {} 
seconds", topicName, subName,
+                    messagePosition);
+
+            
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
+                try {
+                    // If given position larger than entry position.
+                    return ((PositionImpl) 
entry.getPosition()).compareTo((PositionImpl) messagePosition) <= 0;
+                } finally {
+                    entry.release();
+                }
+            }, this, null);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Ignore expire-message scheduled task, last 
check is still running", topicName,
+                        subName);
+            }
+        }
+    }
+
+
     public void updateRates() {
         msgExpired.calculateRate();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 35f8199..ec3397d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -712,6 +712,18 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         }
     }
 
+    public void expireMessages(Position position) {
+        if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
+                || (cursor.getNumberOfEntriesInBacklog(false) < 
MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+                && !topic.isOldestMessageExpired(cursor, 
messageTTLInSeconds))) {
+            // don't do anything for almost caught-up connected subscriptions
+            return;
+        }
+        if (expiryMonitor != null) {
+            expiryMonitor.expireMessages(messageTTLInSeconds);
+        }
+    }
+
     @Override
     public Optional<DispatchRateLimiter> getRateLimiter() {
         return dispatchRateLimiter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1aeced3..4a8186a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -894,6 +894,11 @@ public class PersistentSubscription implements 
Subscription {
         expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
+    @Override
+    public void expireMessages(Position position) {
+        expiryMonitor.expireMessages(position);
+    }
+
     public double getExpiredMessageRate() {
         return expiryMonitor.getMessageExpiryRate();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 3f03da3..c2f1b11 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -128,6 +128,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1574,16 +1575,17 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     long messageTimestamp = System.currentTimeMillis();
     long secondTimestamp = System.currentTimeMillis();
 
-    private void publishMessagesOnPersistentTopic(String topicName, int 
messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, false);
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, 
int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
     }
 
-    private void publishNullValueMessageOnPersistentTopic(String topicName, 
int messages) throws Exception {
-        publishMessagesOnPersistentTopic(topicName, messages, 0, true);
+    private List<MessageId> publishNullValueMessageOnPersistentTopic(String 
topicName, int messages) throws Exception {
+        return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
     }
 
-    private void publishMessagesOnPersistentTopic(String topicName, int 
messages, int startIdx,
+    private List<MessageId> publishMessagesOnPersistentTopic(String topicName, 
int messages, int startIdx,
                                                   boolean nullValue) throws 
Exception {
+        List<MessageId> messageIds = new ArrayList<>();
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
             .topic(topicName)
             .enableBatching(false)
@@ -1592,14 +1594,15 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         for (int i = startIdx; i < (messages + startIdx); i++) {
             if (nullValue) {
-                producer.send(null);
+                messageIds.add(producer.send(null));
             } else {
                 String message = "message-" + i;
-                producer.send(message.getBytes());
+                messageIds.add(producer.send(message.getBytes()));
             }
         }
 
         producer.close();
+        return messageIds;
     }
 
     @Test
@@ -2040,7 +2043,6 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
      */
     @Test
     public void testPersistentTopicsExpireMessages() throws Exception {
-
         // Force to create a topic
         publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
         assertEquals(admin.topics().getList("prop-xyz/ns1"),
@@ -2060,21 +2062,21 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(),
 3);
 
-        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
+        List<MessageId> messageIds = 
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
 
         TopicStats topicStats = 
admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         assertEquals(topicStats.subscriptions.get("my-sub1").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
 
-        Thread.sleep(1000); // wait for 1 seconds to expire message
+        Thread.sleep(1000);
         admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", 
"my-sub1", 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as 
it is async
-
+        // Wait at most 2 seconds for sub1's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
assertTrue(
+                
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub1").lastMarkDeleteAdvancedTimestamp
 > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
         SubscriptionStats subStats1 = topicStats.subscriptions.get("my-sub1");
         assertEquals(subStats1.msgBacklog, 0);
-        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
         SubscriptionStats subStats2 = topicStats.subscriptions.get("my-sub2");
         assertEquals(subStats2.msgBacklog, 10);
         assertEquals(subStats2.lastMarkDeleteAdvancedTimestamp, 0L);
@@ -2082,24 +2084,69 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(subStats3.msgBacklog, 10);
         assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
-        
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2",
 1);
-        Thread.sleep(1000); // wait for 1 seconds to execute expire message as 
it is async
+        admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", 
"my-sub2",
+                messageIds.get(4), false);
+        // Wait at most 2 seconds for sub2's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
assertTrue(
+                
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub2").lastMarkDeleteAdvancedTimestamp
 > 0L));
+        topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertTrue(subStats1.lastMarkDeleteAdvancedTimestamp > 0L);
+        Long sub2lastMarkDeleteAdvancedTimestamp = 
subStats1.lastMarkDeleteAdvancedTimestamp;
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 5);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 10);
+        assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
+        
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2",
 1);
+        // Wait at most 2 seconds for sub3's message to expire.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
assertTrue(
+                
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp
 > 0L));
         topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
-        SubscriptionStats newSubStats1 = 
topicStats.subscriptions.get("my-sub1");
-        assertEquals(newSubStats1.msgBacklog, 0);
-        assertEquals(newSubStats1.lastMarkDeleteAdvancedTimestamp, 
subStats1.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats2 = 
topicStats.subscriptions.get("my-sub2");
-        assertEquals(newSubStats2.msgBacklog, 0);
-        assertTrue(newSubStats2.lastMarkDeleteAdvancedTimestamp > 
subStats2.lastMarkDeleteAdvancedTimestamp);
-        SubscriptionStats newSubStats3 = 
topicStats.subscriptions.get("my-sub3");
-        assertEquals(newSubStats3.msgBacklog, 0);
-        assertTrue(newSubStats3.lastMarkDeleteAdvancedTimestamp > 
subStats3.lastMarkDeleteAdvancedTimestamp);
+        subStats1 = topicStats.subscriptions.get("my-sub1");
+        assertEquals(subStats1.msgBacklog, 0);
+        assertEquals(subStats1.lastMarkDeleteAdvancedTimestamp, 
subStats1.lastMarkDeleteAdvancedTimestamp);
+        // Wait at most 2 seconds for rest of sub2's message to expire.
+        subStats2 = topicStats.subscriptions.get("my-sub2");
+        assertEquals(subStats2.msgBacklog, 0);
+        assertTrue(subStats2.lastMarkDeleteAdvancedTimestamp > 
sub2lastMarkDeleteAdvancedTimestamp);
+        subStats3 = topicStats.subscriptions.get("my-sub3");
+        assertEquals(subStats3.msgBacklog, 0);
 
         consumer1.close();
         consumer2.close();
         consumer3.close();
+    }
 
+    @Test
+    public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() 
throws Exception {
+        // Force to create a topic
+        
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 
0);
+        assertEquals(admin.topics().getList("prop-xyz/ns1"),
+                
Lists.newArrayList("persistent://prop-xyz/ns1/ds2-partition-2"));
+
+        // create consumer and subscription
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
+                .topic("persistent://prop-xyz/ns1/ds2-partition-2")
+                .subscriptionType(SubscriptionType.Shared);
+        @Cleanup
+        Consumer<byte[]> consumer = 
consumerBuilder.clone().subscriptionName("my-sub").subscribe();
+
+        
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(),
 1);
+        
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 
10);
+        try {
+            
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", 
"my-sub",
+                    new MessageIdImpl(1, 1, 1), false);
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Invalid parameter for expire 
message by position"));
+        }
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0af9b1a..0fca6ea 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -67,6 +67,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
 import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 36ca8e8..a2b8a1e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,13 @@
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
@@ -29,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -49,12 +57,18 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import 
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
 import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+
 /**
  */
 public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -206,7 +220,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
      * @throws Exception
      */
     @Test
-    void testMessageExpiryWithNonRecoverableException() throws Exception {
+    void testMessageExpiryWithTimestampNonRecoverableException() throws 
Exception {
 
         final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithNonRecoverableLedgers";
         final int entriesPerLedger = 2;
@@ -257,4 +271,74 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         factory.shutdown();
 
     }
+
+    @Test
+    void testMessageExpiryWithPosition() throws Exception {
+        final String ledgerAndCursorName = 
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
+        final int entriesPerLedger = 5;
+        final int totalEntries = 30;
+        List<PositionImpl> positions = new ArrayList<>();
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        config.setAutoSkipNonRecoverableData(true);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+
+        PersistentSubscription subscription = 
mock(PersistentSubscription.class);
+        Topic topic = mock(Topic.class);
+        when(subscription.getTopic()).thenReturn(topic);
+
+        for (int i = 0; i < totalEntries; i++) {
+            positions.add((PositionImpl) 
ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
+        }
+        
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
+        for (Position p : positions) {
+            System.out.println(p);
+        }
+        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
+                cursor.getName(), cursor, subscription));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(0).getLedgerId(), -1));
+
+        // Expire by position and verify mark delete position of cursor.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position beyond last position and nothing should happen.
+        monitor.expireMessages(PositionImpl.get(100, 100));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+
+        // Expire by position again and verify mark delete position of cursor 
didn't change.
+        monitor.expireMessages(positions.get(15));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position before current mark delete position and verify 
mark delete position of cursor didn't change.
+        monitor.expireMessages(positions.get(10));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        clearInvocations(monitor);
+
+        // Expire by position after current mark delete position and verify 
mark delete position of cursor move to new position.
+        monitor.expireMessages(positions.get(16));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
+        assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(16).getLedgerId(), 
positions.get(16).getEntryId()));
+        clearInvocations(monitor);
+
+        cursor.close();
+        ledger.close();
+        factory.shutdown();
+    }
+
+    @Test
+    public void test() {
+        ResetCursorData resetCursorData = new ResetCursorData(1, 1);
+        resetCursorData.setExcluded(true);
+        System.out.println(Entity.entity(resetCursorData, 
MediaType.APPLICATION_JSON));
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index a7b63b2..f02f5e7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -212,6 +213,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
         tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        tenantAdmin.topics().expireMessages(topicName, subscriptionName, new 
MessageIdImpl(-1, -1, -1), true);
         tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index d5c35a9..54d831e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1183,6 +1183,40 @@ public interface Topics {
             long expireTimeInSeconds);
 
     /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds 
for a given subscription.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void expireMessages(String topic, String subscriptionName, MessageId 
messageId, boolean isExcluded)
+            throws PulsarAdminException;
+
+    /**
+     * Expire all messages older than given N (expireTimeInSeconds) seconds 
for a given subscription asynchronously.
+     *
+     * @param topic
+     *            topic name
+     * @param subscriptionName
+     *            Subscription name
+     * @param messageId
+     *            Position before which all messages will be expired.
+     * @param isExcluded
+     *            Will message at passed in position also be expired.
+     * @return
+     *            A {@link CompletableFuture} that'll be completed when expire 
message is done.
+     */
+    CompletableFuture<Void> expireMessagesAsync(String topic, String 
subscriptionName,
+                                                MessageId messageId, boolean 
isExcluded);
+
+    /**
      * Expire all messages older than given N seconds for all subscriptions of 
the persistent-topic.
      *
      * @param topic
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 314a50b..550b7d4 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -938,6 +938,33 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
+    public void expireMessages(String topic, String subscriptionName, 
MessageId messageId, boolean isExcluded)
+            throws PulsarAdminException {
+        try {
+            expireMessagesAsync(topic, subscriptionName, messageId, isExcluded)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> expireMessagesAsync(String topic, String 
subscriptionName,
+                                                       MessageId messageId, 
boolean isExcluded) {
+        TopicName tn = validateTopic(topic);
+        String encodedSubName = Codec.encode(subscriptionName);
+        ResetCursorData resetCursorData = new ResetCursorData(messageId);
+        resetCursorData.setExcluded(isExcluded);
+        WebTarget path = topicPath(tn, "subscription", encodedSubName, 
"expireMessages");
+        return asyncPostRequest(path, Entity.entity(resetCursorData, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void expireMessagesForAllSubscriptions(String topic, long 
expireTimeInSeconds) throws PulsarAdminException {
         try {
             expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds)
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7f0f33c..cf92d40 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -730,6 +730,11 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 
-s sub1 -t 100"));
         verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", 
"sub1", 100);
 
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopics(admin);
+        cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 
-s sub1 -p 1:1 -e"));
+        
verify(mockTopics).expireMessages(eq("persistent://myprop/clust/ns1/ds1"), 
eq("sub1"), eq(new MessageIdImpl(1, 1, -1)), eq(true));
+
         cmdTopics.run(split("expire-messages-all-subscriptions 
persistent://myprop/clust/ns1/ds1 -t 100"));
         
verify(mockTopics).expireMessagesForAllSubscriptions("persistent://myprop/clust/ns1/ds1",
 100);
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index ce48ef4..5113daa 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -104,10 +104,14 @@ abstract class CliCommand {
     }
 
     static MessageId validateMessageIdString(String resetMessageIdStr) throws 
PulsarAdminException {
+        return validateMessageIdString(resetMessageIdStr, -1);
+    }
+
+    static MessageId validateMessageIdString(String resetMessageIdStr, int 
partitionIndex) throws PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
             Preconditions.checkArgument(messageId.length == 2);
-            return new MessageIdImpl(Long.parseLong(messageId[0]), 
Long.parseLong(messageId[1]), -1);
+            return new MessageIdImpl(Long.parseLong(messageId[0]), 
Long.parseLong(messageId[1]), partitionIndex);
         } catch (Exception e) {
             throw new PulsarAdminException(
                     "Invalid message id (must be in format: ledgerId:entryId) 
value " + resetMessageIdStr);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index fc2944d..a0c17a7 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -40,6 +40,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -648,13 +651,35 @@ public class CmdTopics extends CmdBase {
                 "--subscription" }, description = "Subscription to be skip 
messages on", required = true)
         private String subName;
 
-        @Parameter(names = { "-t", "--expireTime" }, description = "Expire 
messages older than time in seconds", required = true)
-        private long expireTimeInSeconds;
+        @Parameter(names = { "-t", "--expireTime" }, description = "Expire 
messages older than time in seconds")
+        private long expireTimeInSeconds = -1;
+
+        @Parameter(names = { "--position",
+                "-p" }, description = "message position to reset back to 
(ledgerId:entryId)", required = false)
+        private String messagePosition;
+
+        @Parameter(names = { "-e", "--exclude-reset-position" },
+                description = "Exclude the reset position, start consume 
messages from the next position.", required = false)
+        private boolean excludeResetPosition = false;
 
         @Override
         void run() throws PulsarAdminException {
+            if (expireTimeInSeconds >= 0 && isNotBlank(messagePosition)) {
+                throw new ParameterException(String.format("Can't expire 
message by time and " +
+                        "by message position at the same time."));
+            }
             String topic = validateTopicName(params);
-            topics.expireMessages(topic, subName, expireTimeInSeconds);
+            if (expireTimeInSeconds >= 0) {
+                topics.expireMessages(topic, subName, expireTimeInSeconds);
+            } else if (isNotBlank(messagePosition)) {
+                int partitionIndex = TopicName.get(topic).getPartitionIndex();
+                MessageId messageId = validateMessageIdString(messagePosition, 
partitionIndex);
+                topics.expireMessages(topic, subName, messageId, 
excludeResetPosition);
+            } else {
+                throw new ParameterException(
+                        "Either time (--expireTime) or message position 
(--position) has to be provided" +
+                                " to expire messages");
+            }
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
index 9311fa0..1bc7b7f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java
@@ -49,10 +49,12 @@ public class ResetCursorData {
             this.ledgerId = batchMessageId.getLedgerId();
             this.entryId = batchMessageId.getEntryId();
             this.batchIndex = batchMessageId.getBatchIndex();
+            this.partitionIndex = batchMessageId.partitionIndex;
         } else if (messageId instanceof MessageIdImpl) {
             MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
             this.ledgerId = messageIdImpl.getLedgerId();
             this.entryId = messageIdImpl.getEntryId();
+            this.partitionIndex = messageIdImpl.partitionIndex;
         }  else if (messageId instanceof TopicMessageIdImpl) {
             throw new IllegalArgumentException("Not supported operation on 
partitioned-topic");
         }

Reply via email to