This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 351cb049e41b94e0fefc8f44b79c574ce48df2f5 Author: Marvin Cai <[email protected]> AuthorDate: Sun Feb 14 21:03:28 2021 -0800 [Admin CLI] Inform user when expire message request is not executed. (#9561) Fixes #9560 ### Motivation When there's ongoing expire message request or subscription is almost catch up, an expire-message request won't be executed while admin cli won't inform user about that. ### Modifications Add a boolean as return value for PersistentMessageExpiryMonitor.expireMessage indicating if the expire message operation is issued or not, and return corresponding message to user if the operation is not executed. ### Verifying this change This change added tests and can be verified as follows: - Add test case for new change. (cherry picked from commit b4dfeeee8be5e2e5ce61b671d249b0bc69c16849) --- .../broker/admin/impl/PersistentTopicsBase.java | 42 +++++++++++++++++----- .../apache/pulsar/broker/service/Subscription.java | 4 +-- .../nonpersistent/NonPersistentSubscription.java | 7 ++-- .../persistent/PersistentMessageExpiryMonitor.java | 15 ++++++-- .../service/persistent/PersistentReplicator.java | 18 ++++------ .../service/persistent/PersistentSubscription.java | 13 +++---- .../apache/pulsar/broker/admin/AdminApiTest.java | 29 ++++++++++----- .../pulsar/broker/admin/v1/V1_AdminApiTest.java | 7 +++- .../service/PersistentMessageFinderTest.java | 36 ++++++++++++++----- .../api/AuthorizationProducerConsumerTest.java | 16 +++++++-- 10 files changed, 134 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4c4db2b..b2c2324 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2849,18 +2849,32 @@ public class PersistentTopicsBase extends AdminResource { PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); try { + boolean issued; if (subName.startsWith(topic.getReplicatorPrefix())) { String remoteCluster = PersistentReplicator.getRemoteCluster(subName); PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); checkNotNull(repl); - repl.expireMessages(expireTimeInSeconds); + issued = repl.expireMessages(expireTimeInSeconds); } else { PersistentSubscription sub = topic.getSubscription(subName); checkNotNull(sub); - sub.expireMessages(expireTimeInSeconds); + issued = sub.expireMessages(expireTimeInSeconds); + } + if (issued) { + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName, + subName); + } else { + if (log.isDebugEnabled()) { + log.debug("Expire message by timestamp not issued on topic {} for subscription {} due to ongoing " + + "message expiration not finished or subscription almost catch up. If it's performed on " + + "a partitioned topic operation might succeeded on other partitions, please check " + + "stats of individual partition.", topicName, subName); + } + throw new RestException(Status.CONFLICT, "Expire message by timestamp not issued on topic " + + topicName + " for subscription " + subName + " due to ongoing message expiration not finished or " + + " subscription almost catch up. If it's performed on a partitioned topic operation might succeeded " + + "on other partitions, please check stats of individual partition."); } - log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName, - subName); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Subscription not found"); } catch (Exception exception) { @@ -2918,19 +2932,31 @@ public class PersistentTopicsBase extends AdminResource { getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); + boolean issued; try { if (subName.startsWith(topic.getReplicatorPrefix())) { String remoteCluster = PersistentReplicator.getRemoteCluster(subName); PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); checkNotNull(repl); - repl.expireMessages(position); + issued = repl.expireMessages(position); } else { checkNotNull(sub); - sub.expireMessages(position); + issued = sub.expireMessages(position); + } + if (issued) { + log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, + topicName, subName); + } else { + if (log.isDebugEnabled()) { + log.debug("Expire message by position not issued on topic {} for subscription {} " + + "due to ongoing message expiration not finished or subscription " + + "almost catch up.", topicName, subName); + } + throw new RestException(Status.CONFLICT, "Expire message by position not issued on topic " + + topicName + " for subscription " + subName + " due to ongoing message expiration" + + " not finished or invalid message position provided."); } - log.info("[{}] Message expire issued up to {} on {} {}", clientAppId(), position, topicName, - subName); } catch (NullPointerException npe) { throw new RestException(Status.NOT_FOUND, "Subscription not found"); } catch (Exception exception) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 0cd5585..2ebb3f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -84,9 +84,9 @@ public interface Subscription { CompletableFuture<Entry> peekNthMessage(int messagePosition); - void expireMessages(int messageTTLInSeconds); + boolean expireMessages(int messageTTLInSeconds); - void expireMessages(Position position); + boolean expireMessages(Position position); void redeliverUnacknowledgedMessages(Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index bb1cb27..7aa26fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -430,12 +430,13 @@ public class NonPersistentSubscription implements Subscription { } @Override - public void expireMessages(int messageTTLInSeconds) { - // No-op + public boolean expireMessages(int messageTTLInSeconds) { + throw new UnsupportedOperationException("Expire message by timestamp is not supported for" + + " non-persistent topic."); } @Override - public void expireMessages(Position position) { + public boolean expireMessages(Position position) { throw new UnsupportedOperationException("Expire message by position is not supported for" + " non-persistent topic."); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index df0b84c..299c60f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -65,7 +65,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); } - public void expireMessages(int messageTTLInSeconds) { + public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, messageTTLInSeconds); @@ -85,18 +85,25 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { } return false; }, this, null); + return true; } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName, subName); } + return false; } } - public void expireMessages(Position messagePosition) { + public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { - return; + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " + + "current topic's last position {}", topicName, subName, messagePosition, + subscription.getTopic().getLastPosition()); + } + return false; } if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, position= {} seconds", topicName, subName, @@ -110,11 +117,13 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { entry.release(); } }, this, null); + return true; } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, last check is still running", topicName, subName); } + return false; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index ec3397d..9a68a25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -700,28 +700,24 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat return 0L; } - public void expireMessages(int messageTTLInSeconds) { + public boolean expireMessages(int messageTTLInSeconds) { if ((cursor.getNumberOfEntriesInBacklog(false) == 0) || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { // don't do anything for almost caught-up connected subscriptions - return; + return false; } if (expiryMonitor != null) { - expiryMonitor.expireMessages(messageTTLInSeconds); + return expiryMonitor.expireMessages(messageTTLInSeconds); } + return false; } - public void expireMessages(Position position) { - if ((cursor.getNumberOfEntriesInBacklog(false) == 0) - || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK - && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { - // don't do anything for almost caught-up connected subscriptions - return; - } + public boolean expireMessages(Position position) { if (expiryMonitor != null) { - expiryMonitor.expireMessages(messageTTLInSeconds); + return expiryMonitor.expireMessages(position); } + return false; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 4a8186a..9c0b524 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -883,20 +883,21 @@ public class PersistentSubscription implements Subscription { } @Override - public void expireMessages(int messageTTLInSeconds) { - this.lastExpireTimestamp = System.currentTimeMillis(); + public boolean expireMessages(int messageTTLInSeconds) { if ((getNumberOfEntriesInBacklog(false) == 0) || (dispatcher != null && dispatcher.isConsumerConnected() && getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK && !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) { // don't do anything for almost caught-up connected subscriptions - return; + return false; } - expiryMonitor.expireMessages(messageTTLInSeconds); + this.lastExpireTimestamp = System.currentTimeMillis(); + return expiryMonitor.expireMessages(messageTTLInSeconds); } @Override - public void expireMessages(Position position) { - expiryMonitor.expireMessages(position); + public boolean expireMessages(Position position) { + this.lastExpireTimestamp = System.currentTimeMillis(); + return expiryMonitor.expireMessages(position); } public double getExpiredMessageRate() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c2f1b11..cc9c616 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -93,6 +93,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.lookup.data.LookupData; @@ -2100,7 +2101,12 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(subStats3.msgBacklog, 10); assertEquals(subStats3.lastMarkDeleteAdvancedTimestamp, 0L); - admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1); + try { + admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1); + } catch (Exception e) { + // my-sub1 has no msg backlog, so expire message won't be issued on that subscription + assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); + } // Wait at most 2 seconds for sub3's message to expire. Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> assertTrue( admin.topics().getStats("persistent://prop-xyz/ns1/ds2").subscriptions.get("my-sub3").lastMarkDeleteAdvancedTimestamp > 0L)); @@ -2498,21 +2504,28 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something")); } - @Test(timeOut = 90000) + @Test(timeOut = 20000) public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException { - admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 60); + admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 10); final String topic = "persistent://prop-xyz/ns1/testTopicStatsLastExpireTimestampForSubscription"; - Consumer<byte[]> producer = pulsarClient.newConsumer() + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + for (int i = 0; i < 10; i++) { + producer.send(new byte[1024 * i * 5]); + } + Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName("sub-1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); Assert.assertEquals(admin.topics().getStats(topic).subscriptions.size(), 1); Assert.assertEquals(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp, 0L); - - Thread.sleep(60000); - - Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L); + Thread.sleep(10000); + // Update policy to trigger message expiry check. + admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 5); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L); } @Test(timeOut = 150000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index b202ffc..f3e7089 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -1755,7 +1755,12 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(topicStats.subscriptions.get("my-sub2").msgBacklog, 10); assertEquals(topicStats.subscriptions.get("my-sub3").msgBacklog, 10); - admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/use/ns1/ds2", 1); + try { + admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/use/ns1/ds2", 1); + } catch (Exception e) { + // my-sub1 has no msg backlog, so expire message won't be issued on that subscription + assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); + } Thread.sleep(1000); // wait for 1 seconds to execute expire message as it is async topicStats = admin.topics().getStats("persistent://prop-xyz/use/ns1/ds2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index a2b8a1e..49e7488 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -23,10 +23,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.doAnswer; import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -43,6 +45,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -64,6 +68,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.awaitility.Awaitility; +import org.mockito.stubbing.Answer; import org.testng.annotations.Test; import javax.ws.rs.client.Entity; @@ -295,41 +300,56 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { positions.add((PositionImpl) ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - for (Position p : positions) { - System.out.println(p); - } + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", cursor.getName(), cursor, subscription)); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); + boolean issued; // Expire by position and verify mark delete position of cursor. - monitor.expireMessages(positions.get(15)); + issued = monitor.expireMessages(positions.get(15)); Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + assertTrue(issued); clearInvocations(monitor); // Expire by position beyond last position and nothing should happen. - monitor.expireMessages(PositionImpl.get(100, 100)); + issued = monitor.expireMessages(PositionImpl.get(100, 100)); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + assertFalse(issued); // Expire by position again and verify mark delete position of cursor didn't change. - monitor.expireMessages(positions.get(15)); + issued = monitor.expireMessages(positions.get(15)); Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + assertTrue(issued); clearInvocations(monitor); // Expire by position before current mark delete position and verify mark delete position of cursor didn't change. - monitor.expireMessages(positions.get(10)); + issued = monitor.expireMessages(positions.get(10)); Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); + assertTrue(issued); clearInvocations(monitor); // Expire by position after current mark delete position and verify mark delete position of cursor move to new position. - monitor.expireMessages(positions.get(16)); + issued = monitor.expireMessages(positions.get(16)); Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); assertEquals((PositionImpl) cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(16).getLedgerId(), positions.get(16).getEntryId())); + assertTrue(issued); clearInvocations(monitor); + ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", + cursor.getName(), mockCursor, subscription)); + // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to + // expire message shouldn't issue. + doAnswer(invocation -> null).when(mockCursor).asyncFindNewestMatching(any(), any(), any(), any()); + issued = mockMonitor.expireMessages(positions.get(15)); + assertTrue(issued); + issued = mockMonitor.expireMessages(positions.get(15)); + assertFalse(issued); + cursor.close(); ledger.close(); factory.shutdown(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index f02f5e7..d43f657 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.api; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Maps; @@ -212,7 +213,12 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { // verify tenant is able to perform all subscription-admin api tenantAdmin.topics().skipAllMessages(topicName, subscriptionName); tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1); - tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10); + try { + tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10); + } catch (Exception e) { + // my-sub1 has no msg backlog, so expire message won't be issued on that subscription + assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); + } tenantAdmin.topics().expireMessages(topicName, subscriptionName, new MessageIdImpl(-1, -1, -1), true); tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1); tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10); @@ -244,8 +250,12 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { sub1Admin.topics().skipAllMessages(topicName, subscriptionName); sub1Admin.topics().skipMessages(topicName, subscriptionName, 1); - sub1Admin.topics().expireMessages(topicName, subscriptionName, 10); - sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); + try { + tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10); + } catch (Exception e) { + // my-sub1 has no msg backlog, so expire message won't be issued on that subscription + assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic")); + } sub1Admin.topics().peekMessages(topicName, subscriptionName, 1); sub1Admin.topics().resetCursor(topicName, subscriptionName, 10); sub1Admin.topics().resetCursor(topicName, subscriptionName, MessageId.earliest);
