This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 5614f9cf501 [fix][broker] Race condition causes perpetual backlog on 
internal topics (#25572)
5614f9cf501 is described below

commit 5614f9cf5016ad604a30d4d69895f7078d671109
Author: Darin Spivey <[email protected]>
AuthorDate: Fri May 1 11:58:47 2026 -0400

    [fix][broker] Race condition causes perpetual backlog on internal topics 
(#25572)
    
    Signed-off-by: Darin Spivey <[email protected]>
    Co-authored-by: Darin Spivey <[email protected]>
    (cherry picked from commit 60efea19cc69377a86620e9c1006ee36d519575a)
---
 .../compaction/AbstractTwoPhaseCompactor.java      | 55 +++++++++++++++++++++-
 .../apache/pulsar/compaction/CompactionTest.java   | 32 +++++++++++++
 2 files changed, 86 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 7aba181cb44..bdcdb50eb5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.function.BiPredicate;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -38,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +65,9 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
 
   @VisibleForTesting
   static Runnable injectionAfterSeekInPhaseTwo = () -> {};
+  @VisibleForTesting
+  static BiFunction<RawReader, MessageId, CompletableFuture<Void>> 
injectionPhaseTwoSeek =
+      RawReader::seekAsync;
   private static final Logger log = 
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
   protected static final int MAX_OUTSTANDING = 500;
   protected final Duration phaseOneLoopReadTimeout;
@@ -190,7 +196,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
       LedgerHandle ledger) {
     CompletableFuture<Long> promise = new CompletableFuture<>();
 
-    reader.seekAsync(from).thenCompose((v) -> {
+    phaseTwoSeekWithRetry(reader, from).thenCompose((v) -> {
           injectionAfterSeekInPhaseTwo.run();
           Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
           CompletableFuture<Void> loopPromise = new CompletableFuture<>();
@@ -215,6 +221,53 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
     return promise;
   }
 
+  /**
+   * Seek the compaction subscription to {@code from}, retrying on transient
+   * {@link PulsarClientException.ConnectException}.
+   *
+   * <p>Server-side, {@code PersistentSubscription.resetCursorInternal} 
disconnects the compaction
+   * consumer before resetting the managed cursor, then sends the success 
response. This races with
+   * the client: {@code channelInactive} fires on the consumer's {@code 
ClientCnx} and fails the
+   * in-flight seek future with {@code ConnectException} before the broker's 
success response
+   * arrives. The seek is idempotent and the cursor is already repositioned 
server-side, so
+   * retrying after a short backoff lets the client reconnect and the next 
seek complete normally.
+   * Non-transient failures propagate immediately.
+   */
+  private CompletableFuture<Void> phaseTwoSeekWithRetry(RawReader reader, 
MessageId from) {
+    CompletableFuture<Void> promise = new CompletableFuture<>();
+    Backoff backoff = Backoff.builder()
+        .initialDelay(Duration.ofMillis(100))
+        .maxBackoff(Duration.ofSeconds(1))
+        .mandatoryStop(Duration.ofSeconds(10))
+        .build();
+    attemptPhaseTwoSeek(reader, from, backoff, promise);
+    return promise;
+  }
+
+  private void attemptPhaseTwoSeek(RawReader reader, MessageId from, Backoff 
backoff,
+      CompletableFuture<Void> promise) {
+    injectionPhaseTwoSeek.apply(reader, from).whenComplete((v, ex) -> {
+      if (ex == null) {
+        promise.complete(null);
+        return;
+      }
+      Throwable cause = FutureUtil.unwrapCompletionException(ex);
+      if (!(cause instanceof PulsarClientException.ConnectException)) {
+        promise.completeExceptionally(cause);
+        return;
+      }
+      long nextMs = backoff.next().toMillis();
+      if (backoff.isMandatoryStopMade()) {
+        promise.completeExceptionally(cause);
+        return;
+      }
+      log.warn("Phase two seek failed transiently, will retry. topic={} 
from={} nextMs={}",
+          reader.getTopic(), from, nextMs, cause);
+      scheduler.schedule(() -> attemptPhaseTwoSeek(reader, from, backoff, 
promise),
+          nextMs, TimeUnit.MILLISECONDS);
+    });
+  }
+
   private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, 
MessageId> latestForKey,
       LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
       MessageId lastCompactedMessageId) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 2f32a72d319..5fc6416ee98 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import lombok.Cleanup;
@@ -91,6 +92,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -161,6 +163,7 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
     public void beforeMethod() throws Exception {
         admin.namespaces().removeRetention("my-tenant/my-ns");
         AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+        AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
     }
 
     protected long compact(String topic) throws ExecutionException, 
InterruptedException {
@@ -2518,6 +2521,35 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
                 "value-2", "key-2", "value-0", "key-2", "value-1"));
     }
 
+    @Test
+    public void testPhaseTwoSeekRetriesOnConnectException() throws Exception {
+        final var topic = "persistent://my-tenant/my-ns/phase-two-seek-retry";
+        @Cleanup final var producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        producer.newMessage().key("k").value("v0").send();
+        producer.newMessage().key("k").value("v1").send();
+
+        // Simulate the production race: 
PersistentSubscription.resetCursorInternal disconnects the
+        // consumer before responding to the seek, which fails the client's 
in-flight seek future
+        // with ConnectException. The first attempt here returns a synthetic 
failure without invoking
+        // the real seek (so nothing is in flight on the underlying 
ConsumerImpl); the retry calls
+        // seekAsync for real and the compaction proceeds.
+        final var attempts = new AtomicInteger(0);
+        AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = (reader, msgId) -> {
+            if (attempts.getAndIncrement() == 0) {
+                return FutureUtil.failedFuture(
+                        new PulsarClientException.ConnectException("simulated 
disconnect during seek"));
+            }
+            return reader.seekAsync(msgId);
+        };
+
+        final long compactedLedgerId = compact(topic);
+        assertNotEquals(compactedLedgerId, -1L);
+        assertTrue(attempts.get() >= 2,
+                "Seek should have been retried at least once, attempts=" + 
attempts.get());
+
+        verifyReadKeyValues(topic, true, List.of("k", "v1"));
+    }
+
     private void verifyReadKeyValues(String topic, boolean readCompacted, 
List<String> expectedKeyValues)
             throws Exception {
         @Cleanup final var reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)

Reply via email to