This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19fe2e40392 [cleanup][broker] Remove PersistentSubscription.getStats
(#23095)
19fe2e40392 is described below
commit 19fe2e4039205d20b9d715e5483c4695b7fbe606
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Jul 30 14:50:24 2024 +0800
[cleanup][broker] Remove PersistentSubscription.getStats (#23095)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../service/persistent/PersistentSubscription.java | 43 ++++++----------------
.../broker/service/persistent/PersistentTopic.java | 22 ++++++-----
.../pulsar/broker/service/PersistentTopicTest.java | 34 ++++++++++++-----
3 files changed, 49 insertions(+), 50 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2dd890cfd29..0a57f98eb7a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -31,9 +31,6 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -334,17 +331,18 @@ public class PersistentSubscription extends
AbstractSubscription {
// when topic closes: it iterates through
concurrent-subscription map to close each subscription. so,
// topic.remove again try to access same map which creates
deadlock. so, execute it in different thread.
topic.getBrokerService().pulsar().getExecutor().execute(() -> {
- topic.removeSubscription(subName);
- // Also need remove the cursor here, otherwise the data
deletion will not work well.
- // Because data deletion depends on the mark delete
position of all cursors.
- if (!isResetCursor) {
- try {
-
topic.getManagedLedger().deleteCursor(cursor.getName());
-
topic.getManagedLedger().removeWaitingCursor(cursor);
- } catch (InterruptedException | ManagedLedgerException
e) {
- log.warn("[{}] [{}] Failed to remove non durable
cursor", topic.getName(), subName, e);
+ topic.removeSubscription(subName).thenRunAsync(() -> {
+ // Also need remove the cursor here, otherwise the
data deletion will not work well.
+ // Because data deletion depends on the mark delete
position of all cursors.
+ if (!isResetCursor) {
+ try {
+
topic.getManagedLedger().deleteCursor(cursor.getName());
+
topic.getManagedLedger().removeWaitingCursor(cursor);
+ } catch (InterruptedException |
ManagedLedgerException e) {
+ log.warn("[{}] [{}] Failed to remove non
durable cursor", topic.getName(), subName, e);
+ }
}
- }
+ }, topic.getBrokerService().pulsar().getExecutor());
});
} else {
topic.getManagedLedger().removeWaitingCursor(cursor);
@@ -1202,25 +1200,6 @@ public class PersistentSubscription extends
AbstractSubscription {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}
- /**
- * @deprecated please call {@link #getStatsAsync(GetStatsOptions)}.
- */
- @Deprecated
- public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
- // So far, there is no case hits this check.
- if (getStatsOptions.isGetEarliestTimeInBacklog()) {
- throw new IllegalArgumentException("Calling the sync method
subscription.getStats with"
- + " getEarliestTimeInBacklog, it may encountered a
deadlock error.");
- }
- // The method "getStatsAsync" will be a sync method if the param
"isGetEarliestTimeInBacklog" is false.
- try {
- return getStatsAsync(getStatsOptions).get(5, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException
e) {
- // This error will never occur.
- throw new RuntimeException(e);
- }
- }
-
public CompletableFuture<SubscriptionStatsImpl>
getStatsAsync(GetStatsOptions getStatsOptions) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 42487d7239c..7926545647e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1402,19 +1402,23 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}, null);
}
- void removeSubscription(String subscriptionName) {
+ CompletableFuture<Void> removeSubscription(String subscriptionName) {
PersistentSubscription sub = subscriptions.remove(subscriptionName);
if (sub != null) {
// preserve accumulative stats form removed subscription
- SubscriptionStatsImpl stats = sub.getStats(new
GetStatsOptions(false, false, false, false, false));
- bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
- msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
-
- if (isSystemCursor(subscriptionName)
- ||
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
-
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
- }
+ return sub
+ .getStatsAsync(new GetStatsOptions(false, false, false,
false, false))
+ .thenAccept(stats -> {
+
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
+
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+ if (isSystemCursor(subscriptionName)
+ ||
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+ }
+ });
}
+ return CompletableFuture.completedFuture(null);
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8c21301c15b..e83b1bd9b7b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -53,6 +54,7 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -2187,9 +2189,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
sub1.addConsumer(consumer1);
consumer1.close();
- SubscriptionStatsImpl stats1 = sub1.getStats(new
GetStatsOptions(false, false, false, false, false));
- assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
- assertFalse(stats1.allowOutOfOrderDelivery);
+ CompletableFuture<SubscriptionStatsImpl> stats1Async =
+ sub1.getStatsAsync(new GetStatsOptions(false, false, false,
false, false));
+ assertThat(stats1Async).succeedsWithin(Duration.ofSeconds(3))
+ .matches(stats1 -> {
+ assertEquals(stats1.keySharedMode, "AUTO_SPLIT");
+ assertFalse(stats1.allowOutOfOrderDelivery);
+ return true;
+ });
Consumer consumer2 = new Consumer(sub2, SubType.Key_Shared,
topic.getName(), 2, 0, "Cons2", true, serverCnx,
"myrole-1", Collections.emptyMap(), false,
@@ -2198,9 +2205,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
sub2.addConsumer(consumer2);
consumer2.close();
- SubscriptionStatsImpl stats2 = sub2.getStats(new
GetStatsOptions(false, false, false, false, false));
- assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
- assertTrue(stats2.allowOutOfOrderDelivery);
+ CompletableFuture<SubscriptionStatsImpl> stats2Async =
+ sub2.getStatsAsync(new GetStatsOptions(false, false, false,
false, false));
+ assertThat(stats2Async).succeedsWithin(Duration.ofSeconds(3))
+ .matches(stats2 -> {
+ assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
+ assertTrue(stats2.allowOutOfOrderDelivery);
+ return true;
+ });
KeySharedMeta ksm = new
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
.setAllowOutOfOrderDelivery(false);
@@ -2210,9 +2222,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
sub3.addConsumer(consumer3);
consumer3.close();
- SubscriptionStatsImpl stats3 = sub3.getStats(new
GetStatsOptions(false, false, false, false, false));
- assertEquals(stats3.keySharedMode, "STICKY");
- assertFalse(stats3.allowOutOfOrderDelivery);
+ CompletableFuture<SubscriptionStatsImpl> stats3Async =
sub3.getStatsAsync(new GetStatsOptions(false, false, false, false, false));
+ assertThat(stats3Async).succeedsWithin(Duration.ofSeconds(3))
+ .matches(stats3 -> {
+ assertEquals(stats3.keySharedMode, "STICKY");
+ assertFalse(stats3.allowOutOfOrderDelivery);
+ return true;
+ });
}
private ByteBuf getMessageWithMetadata(byte[] data) {