This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8002c7271dacf662f3628b05a2856b3f50442c23
Author: fengyubiao <[email protected]>
AuthorDate: Wed Aug 14 16:39:55 2024 +0800

    [improve] [broker] Avoid subscription fenced error with consumer.seek 
whenever possible (#23163)
    
    (cherry picked from commit d5ce1cee35363ba2372375c2e8740be6d87488d8)
---
 .../service/persistent/PersistentSubscription.java | 32 +++++++----
 .../broker/service/SubscriptionSeekTest.java       | 64 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index fdb977aa366..31fc21fa167 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -132,6 +132,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
     private final PendingAckHandle pendingAckHandle;
     private volatile Map<String, String> subscriptionProperties;
     private volatile CompletableFuture<Void> fenceFuture;
+    private volatile CompletableFuture<Void> inProgressResetCursorFuture;
 
     static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
         return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : 
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
@@ -220,6 +221,16 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 
     @Override
     public CompletableFuture<Void> addConsumer(Consumer consumer) {
+        CompletableFuture<Void> inProgressResetCursorFuture = 
this.inProgressResetCursorFuture;
+        if (inProgressResetCursorFuture != null) {
+            return inProgressResetCursorFuture.handle((ignore, ignoreEx) -> 
null)
+                    .thenCompose(ignore -> addConsumerInternal(consumer));
+        } else {
+            return addConsumerInternal(consumer);
+        }
+    }
+
+    private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
         return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> 
{
             synchronized (PersistentSubscription.this) {
                 cursor.updateLastActive();
@@ -748,7 +759,8 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                 } else {
                     finalPosition = position.getNext();
                 }
-                resetCursor(finalPosition, future);
+                CompletableFuture<Void> resetCursorFuture = 
resetCursor(finalPosition);
+                FutureUtil.completeAfter(future, resetCursorFuture);
             }
 
             @Override
@@ -767,18 +779,13 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
     }
 
     @Override
-    public CompletableFuture<Void> resetCursor(Position position) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        resetCursor(position, future);
-        return future;
-    }
-
-    private void resetCursor(Position finalPosition, CompletableFuture<Void> 
future) {
+    public CompletableFuture<Void> resetCursor(Position finalPosition) {
         if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, 
FALSE, TRUE)) {
-            future.completeExceptionally(new SubscriptionBusyException("Failed 
to fence subscription"));
-            return;
+            return CompletableFuture.failedFuture(new 
SubscriptionBusyException("Failed to fence subscription"));
         }
 
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        inProgressResetCursorFuture = future;
         final CompletableFuture<Void> disconnectFuture;
 
         // Lock the Subscription object before locking the Dispatcher object 
to avoid deadlocks
@@ -798,6 +805,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
             if (throwable != null) {
                 log.error("[{}][{}] Failed to disconnect consumer from 
subscription", topicName, subName, throwable);
                 IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
+                inProgressResetCursorFuture = null;
                 future.completeExceptionally(
                         new SubscriptionBusyException("Failed to disconnect 
consumers from subscription"));
                 return;
@@ -836,6 +844,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                             dispatcher.cursorIsReset();
                         }
                         IS_FENCED_UPDATER.set(PersistentSubscription.this, 
FALSE);
+                        inProgressResetCursorFuture = null;
                         future.complete(null);
                     }
 
@@ -844,6 +853,7 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
                         log.error("[{}][{}] Failed to reset subscription to 
position {}", topicName, subName,
                                 finalPosition, exception);
                         IS_FENCED_UPDATER.set(PersistentSubscription.this, 
FALSE);
+                        inProgressResetCursorFuture = null;
                         // todo - retry on InvalidCursorPositionException
                         // or should we just ask user to retry one more time?
                         if (exception instanceof 
InvalidCursorPositionException) {
@@ -858,10 +868,12 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
             }).exceptionally((e) -> {
                 log.error("[{}][{}] Error while resetting cursor", topicName, 
subName, e);
                 IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
+                inProgressResetCursorFuture = null;
                 future.completeExceptionally(new BrokerServiceException(e));
                 return null;
             });
         });
+        return future;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index b11946069c9..f3647b34a47 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -33,12 +33,14 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -49,8 +51,12 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterClass;
@@ -757,6 +763,64 @@ public class SubscriptionSeekTest extends BrokerTestBase {
         assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + 
msgInTopic1Partition2) * 2);
     }
 
+    @Test
+    public void testSeekWillNotEncounteredFencedError() throws Exception {
+        String topicName = "persistent://prop/ns-abc/my-topic2";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topicPolicies().setRetention(topicName, new 
RetentionPolicies(3600, 0));
+        // Create a pulsar client with a subscription fenced counter.
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString());
+        AtomicInteger receivedFencedErrorCounter = new AtomicInteger();
+        PulsarClient client = 
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
+                new ClientCnx(conf, eventLoopGroup) {
+                    protected void handleError(CommandError error) {
+                        if (error.getMessage() != null && 
error.getMessage().contains("Subscription is fenced")) {
+                            receivedFencedErrorCounter.incrementAndGet();
+                        }
+                        super.handleError(error);
+                    }
+                });
+
+        // publish some messages.
+        org.apache.pulsar.client.api.Consumer<String> consumer = 
client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("s1")
+                .subscribe();
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName).create();
+        MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0");
+        for (int i = 1; i < 11; i++) {
+            admin.topics().unload(topicName);
+            producer.send(i + "");
+        }
+
+        // Inject a delay for reset-cursor.
+        mockZooKeeper.delay(3000, (op, path) -> {
+            if 
(path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) {
+                return op.toString().equalsIgnoreCase("SET");
+            }
+            return false;
+        });
+
+        // Verify: consumer will not receive "subscription fenced" error after 
a seek.
+        for (int i = 1; i < 11; i++) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+        consumer.seek(msgId1);
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(consumer.isConnected());
+        });
+        assertEquals(receivedFencedErrorCounter.get(), 0);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        client.close();
+        admin.topics().delete(topicName);
+    }
+
     @Test
     public void testExceptionBySeekFunction() throws Exception {
         final String topicName = "persistent://prop/use/ns-abc/test" + 
UUID.randomUUID();

Reply via email to