This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c96a36c587 [fix] [admin] Fix get topic stats fail if a subscription 
catch up concurrently (#20971)
7c96a36c587 is described below

commit 7c96a36c58768e71cc445371bb3d98c5ac6e05cd
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 21 04:49:30 2023 +0800

    [fix] [admin] Fix get topic stats fail if a subscription catch up 
concurrently (#20971)
    
    ### Motivation
    
    **Background**: when calling `pulsar-admin topics stats 
--get-earliest-time-in-backlog <topic name>`, Pulsar will read the first entry 
which is not acknowledged, and respond with the entry write time. The flow is 
like this:
    - get the mark deleted position of the subscription
    - if no backlog, response `-1`
    - else read the next position of the mark deleted position, and respond 
with the entry write time.
    
    **Issue**: if the command `pulsar-admin topics stats 
--get-earliest-time-in-backlog <topic name>` and `consumer.acknowledge` are 
executed at the same time, the step 2 in above flow will get a position which 
is larger than the last confirmed position, lead a read entry error.
    
    | time | `pulsar-admin topics stats --get-earliest-time-in-backlog <topic 
name>` | `consumer.acknowledge` |
    | --- | --- | --- |
    | 1 | mark deleted position is `3:1` and LAC is `3:2` now |
    | 2 | the check `whether has backlog` is passed |
    | 3 | | acknowledged `3:2`, mark deleted position is `3:2` now |
    | 4 | calculate next position: `3:3` |
    | 5 | Read `3:3` and get an error: `read entry failed` |
    
    Note: the test in PR is not intended to reproduce the issue.
    
    ### Modifications
    
    Respond `-1` if the next position of the mark deleted position is larger 
than the LAC
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 ++++
 .../service/persistent/PersistentSubscription.java | 22 ++++++++++--------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 27 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  4 ++--
 4 files changed, 46 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 14f4bfed871..c31a0c38cd3 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1240,6 +1240,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
         PositionImpl nextPos = getNextValidPosition(pos);
 
+        if (nextPos.compareTo(lastConfirmedEntry) > 0) {
+            return CompletableFuture.completedFuture(-1L);
+        }
+
         asyncReadEntry(nextPos, new ReadEntryCallback() {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 009d00cd89c..2f5485afbaa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1160,16 +1160,20 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
         } else {
             subStats.backlogSize = -1;
         }
-        if (getEarliestTimeInBacklog && subStats.msgBacklog > 0) {
-            ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
-            PositionImpl markDeletedPosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
-            long result = 0;
-            try {
-                result = 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
-            } catch (InterruptedException | ExecutionException e) {
-                result = -1;
+        if (getEarliestTimeInBacklog) {
+            if (subStats.msgBacklog > 0) {
+                ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+                PositionImpl markDeletedPosition = (PositionImpl) 
cursor.getMarkDeletedPosition();
+                long result = 0;
+                try {
+                    result = 
managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
+                } catch (InterruptedException | ExecutionException e) {
+                    result = -1;
+                }
+                subStats.earliestMsgPublishTimeInBacklog = result;
+            } else {
+                subStats.earliestMsgPublishTimeInBacklog = -1;
             }
-            subStats.earliestMsgPublishTimeInBacklog = result;
         }
         subStats.msgBacklogNoDelayed = subStats.msgBacklog - 
subStats.msgDelayed;
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index d6176966d85..0adf945a555 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -61,6 +61,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
@@ -3191,6 +3192,32 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 newPartitions);
     }
 
+    /**
+     * Validate retring failed partitioned topic should succeed.
+     * @throws Exception
+     */
+    @Test
+    public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Send one message.
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
+                .create();
+        MessageIdImpl messageId = (MessageIdImpl) producer.send("123");
+        // Catch up.
+        admin.topics().skipAllMessages(topicName, subscriptionName);
+        // Get topic stats with earliestTimeInBacklog
+        TopicStats topicStats = admin.topics().getStats(topicName, false, 
false, true);
+        
assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(),
 -1L);
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test(dataProvider = "topicType")
     public void testPartitionedStatsAggregationByProducerName(String 
topicType) throws Exception {
         restartClusterIfReused();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 342a409c4ae..a4be829b977 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1289,7 +1289,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         TopicStats topicStats = admin.topics().getStats(topic, false, false, 
true);
 
         assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
 0);
+        
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
 -1);
         
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), -1);
 
         // publish several messages
@@ -1309,7 +1309,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
         topicStats = admin.topics().getStats(topic, false, true, true);
         assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
-        
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
 0);
+        
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(),
 -1);
         
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0);
     }
 

Reply via email to