This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 351cb049e41b94e0fefc8f44b79c574ce48df2f5
Author: Marvin Cai <[email protected]>
AuthorDate: Sun Feb 14 21:03:28 2021 -0800

    [Admin CLI] Inform user when expire message request is not executed. (#9561)
    
    Fixes #9560
    
    ### Motivation
    When there's ongoing expire message request or subscription is almost catch 
up, an expire-message request won't be executed while admin cli won't inform 
user about that.
    
    ### Modifications
    Add a boolean as return value for 
PersistentMessageExpiryMonitor.expireMessage indicating if the expire message 
operation is issued or not, and return corresponding message to user if the 
operation is not executed.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
     - Add test case for new change.
    
    (cherry picked from commit b4dfeeee8be5e2e5ce61b671d249b0bc69c16849)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 +++++++++++++++++-----
 .../apache/pulsar/broker/service/Subscription.java |  4 +--
 .../nonpersistent/NonPersistentSubscription.java   |  7 ++--
 .../persistent/PersistentMessageExpiryMonitor.java | 15 ++++++--
 .../service/persistent/PersistentReplicator.java   | 18 ++++------
 .../service/persistent/PersistentSubscription.java | 13 +++----
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 29 ++++++++++-----
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |  7 +++-
 .../service/PersistentMessageFinderTest.java       | 36 ++++++++++++++-----
 .../api/AuthorizationProducerConsumerTest.java     | 16 +++++++--
 10 files changed, 134 insertions(+), 53 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4c4db2b..b2c2324 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2849,18 +2849,32 @@ public class PersistentTopicsBase extends AdminResource 
{
 
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
+            boolean issued;
             if (subName.startsWith(topic.getReplicatorPrefix())) {
                 String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
                 PersistentReplicator repl = (PersistentReplicator) 
topic.getPersistentReplicator(remoteCluster);
                 checkNotNull(repl);
-                repl.expireMessages(expireTimeInSeconds);
+                issued = repl.expireMessages(expireTimeInSeconds);
             } else {
                 PersistentSubscription sub = topic.getSubscription(subName);
                 checkNotNull(sub);
-                sub.expireMessages(expireTimeInSeconds);
+                issued = sub.expireMessages(expireTimeInSeconds);
+            }
+            if (issued) {
+                log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(), expireTimeInSeconds, topicName,
+                        subName);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Expire message by timestamp not issued on topic 
{} for subscription {} due to ongoing "
+                            + "message expiration not finished or subscription 
almost catch up. If it's performed on "
+                            + "a partitioned topic operation might succeeded 
on other partitions, please check "
+                            + "stats of individual partition.", topicName, 
subName);
+                }
+                throw new RestException(Status.CONFLICT, "Expire message by 
timestamp not issued on topic "
+                + topicName + " for subscription " + subName + " due to 
ongoing message expiration not finished or "
+                + " subscription almost catch up. If it's performed on a 
partitioned topic operation might succeeded "
+                + "on other partitions, please check stats of individual 
partition.");
             }
-            log.info("[{}] Message expire started up to {} on {} {}", 
clientAppId(), expireTimeInSeconds, topicName,
-                    subName);
         } catch (NullPointerException npe) {
             throw new RestException(Status.NOT_FOUND, "Subscription not 
found");
         } catch (Exception exception) {
@@ -2918,19 +2932,31 @@ public class PersistentTopicsBase extends AdminResource 
{
                 getEntryBatchSize(batchSizeFuture, topic, messageId, 
batchIndex);
                 batchSizeFuture.thenAccept(bi -> {
                     PositionImpl position = 
calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
+                    boolean issued;
                     try {
                         if (subName.startsWith(topic.getReplicatorPrefix())) {
                             String remoteCluster = 
PersistentReplicator.getRemoteCluster(subName);
                             PersistentReplicator repl = (PersistentReplicator)
                                     
topic.getPersistentReplicator(remoteCluster);
                             checkNotNull(repl);
-                            repl.expireMessages(position);
+                            issued = repl.expireMessages(position);
                         } else {
                             checkNotNull(sub);
-                            sub.expireMessages(position);
+                            issued = sub.expireMessages(position);
+                        }
+                        if (issued) {
+                            log.info("[{}] Message expire started up to {} on 
{} {}", clientAppId(), position,
+                                    topicName, subName);
+                        } else {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Expire message by position not 
issued on topic {} for subscription {} "
+                                        + "due to ongoing message expiration 
not finished or subscription "
+                                        + "almost catch up.", topicName, 
subName);
+                            }
+                            throw new RestException(Status.CONFLICT, "Expire 
message by position not issued on topic "
+                                    + topicName + " for subscription " + 
subName + " due to ongoing message expiration"
+                                    + " not finished or invalid message 
position provided.");
                         }
-                        log.info("[{}] Message expire issued up to {} on {} 
{}", clientAppId(), position, topicName,
-                                subName);
                     } catch (NullPointerException npe) {
                         throw new RestException(Status.NOT_FOUND, 
"Subscription not found");
                     } catch (Exception exception) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 0cd5585..2ebb3f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -84,9 +84,9 @@ public interface Subscription {
 
     CompletableFuture<Entry> peekNthMessage(int messagePosition);
 
-    void expireMessages(int messageTTLInSeconds);
+    boolean expireMessages(int messageTTLInSeconds);
 
-    void expireMessages(Position position);
+    boolean expireMessages(Position position);
 
     void redeliverUnacknowledgedMessages(Consumer consumer);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index bb1cb27..7aa26fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -430,12 +430,13 @@ public class NonPersistentSubscription implements 
Subscription {
     }
 
     @Override
-    public void expireMessages(int messageTTLInSeconds) {
-        // No-op
+    public boolean expireMessages(int messageTTLInSeconds) {
+        throw new UnsupportedOperationException("Expire message by timestamp 
is not supported for"
+                + " non-persistent topic.");
     }
 
     @Override
-    public void expireMessages(Position position) {
+    public boolean expireMessages(Position position) {
         throw new UnsupportedOperationException("Expire message by position is 
not supported for"
                 + " non-persistent topic.");
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index df0b84c..299c60f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -65,7 +65,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
                 && 
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
     }
 
-    public void expireMessages(int messageTTLInSeconds) {
+    public boolean expireMessages(int messageTTLInSeconds) {
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
             log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
                     messageTTLInSeconds);
@@ -85,18 +85,25 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
                 }
                 return false;
             }, this, null);
+            return true;
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, last 
check is still running", topicName,
                         subName);
             }
+            return false;
         }
     }
 
-    public void expireMessages(Position messagePosition) {
+    public boolean expireMessages(Position messagePosition) {
         // If it's beyond last position of this topic, do nothing.
         if (((PositionImpl) 
subscription.getTopic().getLastPosition()).compareTo((PositionImpl) 
messagePosition) < 0) {
-            return;
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Ignore expire-message scheduled task, 
given position {} is beyond "
+                         + "current topic's last position {}", topicName, 
subName, messagePosition,
+                        subscription.getTopic().getLastPosition());
+            }
+            return false;
         }
         if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
             log.info("[{}][{}] Starting message expiry check, position= {} 
seconds", topicName, subName,
@@ -110,11 +117,13 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
                     entry.release();
                 }
             }, this, null);
+            return true;
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Ignore expire-message scheduled task, last 
check is still running", topicName,
                         subName);
             }
+            return false;
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index ec3397d..9a68a25 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -700,28 +700,24 @@ public class PersistentReplicator extends 
AbstractReplicator implements Replicat
         return 0L;
     }
 
-    public void expireMessages(int messageTTLInSeconds) {
+    public boolean expireMessages(int messageTTLInSeconds) {
         if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
                 || (cursor.getNumberOfEntriesInBacklog(false) < 
MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
                         && !topic.isOldestMessageExpired(cursor, 
messageTTLInSeconds))) {
             // don't do anything for almost caught-up connected subscriptions
-            return;
+            return false;
         }
         if (expiryMonitor != null) {
-            expiryMonitor.expireMessages(messageTTLInSeconds);
+            return expiryMonitor.expireMessages(messageTTLInSeconds);
         }
+        return false;
     }
 
-    public void expireMessages(Position position) {
-        if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
-                || (cursor.getNumberOfEntriesInBacklog(false) < 
MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
-                && !topic.isOldestMessageExpired(cursor, 
messageTTLInSeconds))) {
-            // don't do anything for almost caught-up connected subscriptions
-            return;
-        }
+    public boolean expireMessages(Position position) {
         if (expiryMonitor != null) {
-            expiryMonitor.expireMessages(messageTTLInSeconds);
+            return expiryMonitor.expireMessages(position);
         }
+        return false;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4a8186a..9c0b524 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -883,20 +883,21 @@ public class PersistentSubscription implements 
Subscription {
     }
 
     @Override
-    public void expireMessages(int messageTTLInSeconds) {
-        this.lastExpireTimestamp = System.currentTimeMillis();
+    public boolean expireMessages(int messageTTLInSeconds) {
         if ((getNumberOfEntriesInBacklog(false) == 0) || (dispatcher != null 
&& dispatcher.isConsumerConnected()
                 && getNumberOfEntriesInBacklog(false) < 
MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
                 && !topic.isOldestMessageExpired(cursor, 
messageTTLInSeconds))) {
             // don't do anything for almost caught-up connected subscriptions
-            return;
+            return false;
         }
-        expiryMonitor.expireMessages(messageTTLInSeconds);
+        this.lastExpireTimestamp = System.currentTimeMillis();
+        return expiryMonitor.expireMessages(messageTTLInSeconds);
     }
 
     @Override
-    public void expireMessages(Position position) {
-        expiryMonitor.expireMessages(position);
+    public boolean expireMessages(Position position) {
+        this.lastExpireTimestamp = System.currentTimeMillis();
+        return expiryMonitor.expireMessages(position);
     }
 
     public double getExpiredMessageRate() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c2f1b11..cc9c616 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -93,6 +93,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.lookup.data.LookupData;
@@ -2100,7 +2101,12 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(subStats3.msgBacklog, 10);
         assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L);
 
-        
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2",
 1);
+        try {
+            
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2",
 1);
+        } catch (Exception e) {
+            // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
+            assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
+        }
         // Wait at most 2 seconds for sub3's message to expire.
         Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
assertTrue(
                 
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp
 > 0L));
@@ -2498,21 +2504,28 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         
assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed
 at something"));
     }
 
-    @Test(timeOut = 90000)
+    @Test(timeOut = 20000)
     public void testTopicStatsLastExpireTimestampForSubscription() throws 
PulsarAdminException, PulsarClientException, InterruptedException {
-        admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 60);
+        admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 10);
         final String topic = 
"persistent://prop-xyz/ns1/testTopicStatsLastExpireTimestampForSubscription";
-        Consumer<byte[]> producer = pulsarClient.newConsumer()
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.send(new byte[1024 * i * 5]);
+        }
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
             .topic(topic)
             .subscriptionName("sub-1")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
             .subscribe();
 
         
Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 1);
         
Assert.assertEquals(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp,
 0L);
-
-        Thread.sleep(60000);
-
-        
Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp
 > 0L);
+        Thread.sleep(10000);
+        // Update policy to trigger message expiry check.
+        admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 5);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp
 > 0L);
     }
 
     @Test(timeOut = 150000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index b202ffc..f3e7089 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -1755,7 +1755,12 @@ public class V1_AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10);
 
-        
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/use/ns1/ds2",
 1);
+        try {
+            
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/use/ns1/ds2",
 1);
+        } catch (Exception e) {
+            // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
+            assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
+        }
         Thread.sleep(1000); // wait for 1 seconds to execute expire message as 
it is async
 
         topicStats = 
admin.topics().getStats("persistent://prop-xyz/use/ns1/ds2");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index a2b8a1e..49e7488 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -23,10 +23,12 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
 import static org.powermock.api.mockito.PowerMockito.mock;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -43,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -64,6 +68,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import org.awaitility.Awaitility;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.client.Entity;
@@ -295,41 +300,56 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
             positions.add((PositionImpl) 
ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
         }
         
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
-        for (Position p : positions) {
-            System.out.println(p);
-        }
+
         PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
                 cursor.getName(), cursor, subscription));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(0).getLedgerId(), -1));
+        boolean issued;
 
         // Expire by position and verify mark delete position of cursor.
-        monitor.expireMessages(positions.get(15));
+        issued = monitor.expireMessages(positions.get(15));
         Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        assertTrue(issued);
         clearInvocations(monitor);
 
         // Expire by position beyond last position and nothing should happen.
-        monitor.expireMessages(PositionImpl.get(100, 100));
+        issued = monitor.expireMessages(PositionImpl.get(100, 100));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        assertFalse(issued);
 
         // Expire by position again and verify mark delete position of cursor 
didn't change.
-        monitor.expireMessages(positions.get(15));
+        issued = monitor.expireMessages(positions.get(15));
         Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        assertTrue(issued);
         clearInvocations(monitor);
 
         // Expire by position before current mark delete position and verify 
mark delete position of cursor didn't change.
-        monitor.expireMessages(positions.get(10));
+        issued = monitor.expireMessages(positions.get(10));
         Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(15).getLedgerId(), 
positions.get(15).getEntryId()));
+        assertTrue(issued);
         clearInvocations(monitor);
 
         // Expire by position after current mark delete position and verify 
mark delete position of cursor move to new position.
-        monitor.expireMessages(positions.get(16));
+        issued = monitor.expireMessages(positions.get(16));
         Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> 
verify(monitor, times(1)).findEntryComplete(any(), any()));
         assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), 
PositionImpl.get(positions.get(16).getLedgerId(), 
positions.get(16).getEntryId()));
+        assertTrue(issued);
         clearInvocations(monitor);
 
+        ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
+        PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor("topicname",
+                cursor.getName(), mockCursor, subscription));
+        // Not calling findEntryComplete to clear expirationCheckInProgress 
condition, so following call to
+        // expire message shouldn't issue.
+        doAnswer(invocation -> 
null).when(mockCursor).asyncFindNewestMatching(any(), any(), any(), any());
+        issued = mockMonitor.expireMessages(positions.get(15));
+        assertTrue(issued);
+        issued = mockMonitor.expireMessages(positions.get(15));
+        assertFalse(issued);
+
         cursor.close();
         ledger.close();
         factory.shutdown();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index f02f5e7..d43f657 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Maps;
@@ -212,7 +213,12 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         // verify tenant is able to perform all subscription-admin api
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
         tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
-        tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        try {
+            tenantAdmin.topics().expireMessages(topicName, subscriptionName, 
10);
+        } catch (Exception e) {
+            // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
+            assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
+        }
         tenantAdmin.topics().expireMessages(topicName, subscriptionName, new 
MessageIdImpl(-1, -1, -1), true);
         tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
         tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
@@ -244,8 +250,12 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 
         sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
         sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
-        sub1Admin.topics().expireMessages(topicName, subscriptionName, 10);
-        sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
+        try {
+            tenantAdmin.topics().expireMessages(topicName, subscriptionName, 
10);
+        } catch (Exception e) {
+            // my-sub1 has no msg backlog, so expire message won't be issued 
on that subscription
+            assertTrue(e.getMessage().startsWith("Expire message by timestamp 
not issued on topic"));
+        }        sub1Admin.topics().peekMessages(topicName, subscriptionName, 
1);
         sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
         sub1Admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
 

Reply via email to