This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8002c7271dacf662f3628b05a2856b3f50442c23 Author: fengyubiao <[email protected]> AuthorDate: Wed Aug 14 16:39:55 2024 +0800 [improve] [broker] Avoid subscription fenced error with consumer.seek whenever possible (#23163) (cherry picked from commit d5ce1cee35363ba2372375c2e8740be6d87488d8) --- .../service/persistent/PersistentSubscription.java | 32 +++++++---- .../broker/service/SubscriptionSeekTest.java | 64 ++++++++++++++++++++++ 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fdb977aa366..31fc21fa167 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -132,6 +132,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private final PendingAckHandle pendingAckHandle; private volatile Map<String, String> subscriptionProperties; private volatile CompletableFuture<Void> fenceFuture; + private volatile CompletableFuture<Void> inProgressResetCursorFuture; static Map<String, Long> getBaseCursorProperties(boolean isReplicated) { return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; @@ -220,6 +221,16 @@ public class PersistentSubscription extends AbstractSubscription implements Subs @Override public CompletableFuture<Void> addConsumer(Consumer consumer) { + CompletableFuture<Void> inProgressResetCursorFuture = this.inProgressResetCursorFuture; + if (inProgressResetCursorFuture != null) { + return inProgressResetCursorFuture.handle((ignore, ignoreEx) -> null) + .thenCompose(ignore -> addConsumerInternal(consumer)); + } else { + return addConsumerInternal(consumer); + } + } + + private CompletableFuture<Void> addConsumerInternal(Consumer consumer) { return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> { synchronized (PersistentSubscription.this) { cursor.updateLastActive(); @@ -748,7 +759,8 @@ public class PersistentSubscription extends AbstractSubscription implements Subs } else { finalPosition = position.getNext(); } - resetCursor(finalPosition, future); + CompletableFuture<Void> resetCursorFuture = resetCursor(finalPosition); + FutureUtil.completeAfter(future, resetCursorFuture); } @Override @@ -767,18 +779,13 @@ public class PersistentSubscription extends AbstractSubscription implements Subs } @Override - public CompletableFuture<Void> resetCursor(Position position) { - CompletableFuture<Void> future = new CompletableFuture<>(); - resetCursor(position, future); - return future; - } - - private void resetCursor(Position finalPosition, CompletableFuture<Void> future) { + public CompletableFuture<Void> resetCursor(Position finalPosition) { if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { - future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription")); - return; + return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); } + final CompletableFuture<Void> future = new CompletableFuture<>(); + inProgressResetCursorFuture = future; final CompletableFuture<Void> disconnectFuture; // Lock the Subscription object before locking the Dispatcher object to avoid deadlocks @@ -798,6 +805,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs if (throwable != null) { log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally( new SubscriptionBusyException("Failed to disconnect consumers from subscription")); return; @@ -836,6 +844,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs dispatcher.cursorIsReset(); } IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.complete(null); } @@ -844,6 +853,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName, finalPosition, exception); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; // todo - retry on InvalidCursorPositionException // or should we just ask user to retry one more time? if (exception instanceof InvalidCursorPositionException) { @@ -858,10 +868,12 @@ public class PersistentSubscription extends AbstractSubscription implements Subs }).exceptionally((e) -> { log.error("[{}][{}] Error while resetting cursor", topicName, subName, e); IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally(new BrokerServiceException(e)); return null; }); }); + return future; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index b11946069c9..f3647b34a47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -33,12 +33,14 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -49,8 +51,12 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -757,6 +763,64 @@ public class SubscriptionSeekTest extends BrokerTestBase { assertEquals(count, (msgInTopic1Partition0 + msgInTopic1Partition1 + msgInTopic1Partition2) * 2); } + @Test + public void testSeekWillNotEncounteredFencedError() throws Exception { + String topicName = "persistent://prop/ns-abc/my-topic2"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topicPolicies().setRetention(topicName, new RetentionPolicies(3600, 0)); + // Create a pulsar client with a subscription fenced counter. + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + AtomicInteger receivedFencedErrorCounter = new AtomicInteger(); + PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> + new ClientCnx(conf, eventLoopGroup) { + protected void handleError(CommandError error) { + if (error.getMessage() != null && error.getMessage().contains("Subscription is fenced")) { + receivedFencedErrorCounter.incrementAndGet(); + } + super.handleError(error); + } + }); + + // publish some messages. + org.apache.pulsar.client.api.Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("s1") + .subscribe(); + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topicName).create(); + MessageIdImpl msgId1 = (MessageIdImpl) producer.send("0"); + for (int i = 1; i < 11; i++) { + admin.topics().unload(topicName); + producer.send(i + ""); + } + + // Inject a delay for reset-cursor. + mockZooKeeper.delay(3000, (op, path) -> { + if (path.equals("/managed-ledgers/prop/ns-abc/persistent/my-topic2/s1")) { + return op.toString().equalsIgnoreCase("SET"); + } + return false; + }); + + // Verify: consumer will not receive "subscription fenced" error after a seek. + for (int i = 1; i < 11; i++) { + Message<String> msg = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + consumer.seek(msgId1); + Awaitility.await().untilAsserted(() -> { + assertTrue(consumer.isConnected()); + }); + assertEquals(receivedFencedErrorCounter.get(), 0); + + // cleanup. + producer.close(); + consumer.close(); + client.close(); + admin.topics().delete(topicName); + } + @Test public void testExceptionBySeekFunction() throws Exception { final String topicName = "persistent://prop/use/ns-abc/test" + UUID.randomUUID();
