This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19fe2e40392 [cleanup][broker] Remove PersistentSubscription.getStats 
(#23095)
19fe2e40392 is described below

commit 19fe2e4039205d20b9d715e5483c4695b7fbe606
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Jul 30 14:50:24 2024 +0800

    [cleanup][broker] Remove PersistentSubscription.getStats (#23095)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../service/persistent/PersistentSubscription.java | 43 ++++++----------------
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++-----
 .../pulsar/broker/service/PersistentTopicTest.java | 34 ++++++++++++-----
 3 files changed, 49 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2dd890cfd29..0a57f98eb7a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -31,9 +31,6 @@ import java.util.Optional;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -334,17 +331,18 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 // when topic closes: it iterates through 
concurrent-subscription map to close each subscription. so,
                 // topic.remove again try to access same map which creates 
deadlock. so, execute it in different thread.
                 topic.getBrokerService().pulsar().getExecutor().execute(() -> {
-                    topic.removeSubscription(subName);
-                    // Also need remove the cursor here, otherwise the data 
deletion will not work well.
-                    // Because data deletion depends on the mark delete 
position of all cursors.
-                    if (!isResetCursor) {
-                        try {
-                            
topic.getManagedLedger().deleteCursor(cursor.getName());
-                            
topic.getManagedLedger().removeWaitingCursor(cursor);
-                        } catch (InterruptedException | ManagedLedgerException 
e) {
-                            log.warn("[{}] [{}] Failed to remove non durable 
cursor", topic.getName(), subName, e);
+                    topic.removeSubscription(subName).thenRunAsync(() -> {
+                        // Also need remove the cursor here, otherwise the 
data deletion will not work well.
+                        // Because data deletion depends on the mark delete 
position of all cursors.
+                        if (!isResetCursor) {
+                            try {
+                                
topic.getManagedLedger().deleteCursor(cursor.getName());
+                                
topic.getManagedLedger().removeWaitingCursor(cursor);
+                            } catch (InterruptedException | 
ManagedLedgerException e) {
+                                log.warn("[{}] [{}] Failed to remove non 
durable cursor", topic.getName(), subName, e);
+                            }
                         }
-                    }
+                    }, topic.getBrokerService().pulsar().getExecutor());
                 });
             } else {
                 topic.getManagedLedger().removeWaitingCursor(cursor);
@@ -1202,25 +1200,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
         return cursor.getEstimatedSizeSinceMarkDeletePosition();
     }
 
-    /**
-     * @deprecated please call {@link #getStatsAsync(GetStatsOptions)}.
-     */
-    @Deprecated
-    public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
-        // So far, there is no case hits this check.
-        if (getStatsOptions.isGetEarliestTimeInBacklog()) {
-            throw new IllegalArgumentException("Calling the sync method 
subscription.getStats with"
-                    + " getEarliestTimeInBacklog, it may encountered a 
deadlock error.");
-        }
-        // The method "getStatsAsync" will be a sync method if the param 
"isGetEarliestTimeInBacklog" is false.
-        try {
-            return getStatsAsync(getStatsOptions).get(5, TimeUnit.SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            // This error will never occur.
-            throw new RuntimeException(e);
-        }
-    }
-
     public CompletableFuture<SubscriptionStatsImpl> 
getStatsAsync(GetStatsOptions getStatsOptions) {
         SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 42487d7239c..7926545647e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1402,19 +1402,23 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }, null);
     }
 
-    void removeSubscription(String subscriptionName) {
+    CompletableFuture<Void> removeSubscription(String subscriptionName) {
         PersistentSubscription sub = subscriptions.remove(subscriptionName);
         if (sub != null) {
             // preserve accumulative stats form removed subscription
-            SubscriptionStatsImpl stats = sub.getStats(new 
GetStatsOptions(false, false, false, false, false));
-            bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
-            msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
-
-            if (isSystemCursor(subscriptionName)
-                    || 
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
-                
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
-            }
+            return sub
+                    .getStatsAsync(new GetStatsOptions(false, false, false, 
false, false))
+                    .thenAccept(stats -> {
+                        
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+                        
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+                        if (isSystemCursor(subscriptionName)
+                                || 
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+                            
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+                        }
+                    });
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8c21301c15b..e83b1bd9b7b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -53,6 +54,7 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -2187,9 +2189,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         sub1.addConsumer(consumer1);
         consumer1.close();
 
-        SubscriptionStatsImpl stats1 = sub1.getStats(new 
GetStatsOptions(false, false, false, false, false));
-        assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
-        assertFalse(stats1.allowOutOfOrderDelivery);
+        CompletableFuture<SubscriptionStatsImpl> stats1Async =
+                sub1.getStatsAsync(new GetStatsOptions(false, false, false, 
false, false));
+        assertThat(stats1Async).succeedsWithin(Duration.ofSeconds(3))
+                .matches(stats1 -> {
+                    assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
+                    assertFalse(stats1.allowOutOfOrderDelivery);
+                    return true;
+                });
 
         Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared, 
topic.getName(), 2, 0, "Cons2", true, serverCnx,
                 "myrole-1", Collections.emptyMap(), false,
@@ -2198,9 +2205,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         sub2.addConsumer(consumer2);
         consumer2.close();
 
-        SubscriptionStatsImpl stats2 = sub2.getStats(new 
GetStatsOptions(false, false, false, false, false));
-        assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
-        assertTrue(stats2.allowOutOfOrderDelivery);
+        CompletableFuture<SubscriptionStatsImpl> stats2Async =
+                sub2.getStatsAsync(new GetStatsOptions(false, false, false, 
false, false));
+        assertThat(stats2Async).succeedsWithin(Duration.ofSeconds(3))
+                .matches(stats2 -> {
+                    assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
+                    assertTrue(stats2.allowOutOfOrderDelivery);
+                    return true;
+                });
 
         KeySharedMeta ksm = new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
                 .setAllowOutOfOrderDelivery(false);
@@ -2210,9 +2222,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         sub3.addConsumer(consumer3);
         consumer3.close();
 
-        SubscriptionStatsImpl stats3 = sub3.getStats(new 
GetStatsOptions(false, false, false, false, false));
-        assertEquals(stats3.keySharedMode, "STICKY");
-        assertFalse(stats3.allowOutOfOrderDelivery);
+        CompletableFuture<SubscriptionStatsImpl> stats3Async = 
sub3.getStatsAsync(new GetStatsOptions(false, false, false, false, false));
+        assertThat(stats3Async).succeedsWithin(Duration.ofSeconds(3))
+                .matches(stats3 -> {
+                    assertEquals(stats3.keySharedMode, "STICKY");
+                    assertFalse(stats3.allowOutOfOrderDelivery);
+                    return true;
+                });
     }
 
     private ByteBuf getMessageWithMetadata(byte[] data) {

Reply via email to