This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 5614f9cf501 [fix][broker] Race condition causes perpetual backlog on
internal topics (#25572)
5614f9cf501 is described below
commit 5614f9cf5016ad604a30d4d69895f7078d671109
Author: Darin Spivey <[email protected]>
AuthorDate: Fri May 1 11:58:47 2026 -0400
[fix][broker] Race condition causes perpetual backlog on internal topics
(#25572)
Signed-off-by: Darin Spivey <[email protected]>
Co-authored-by: Darin Spivey <[email protected]>
(cherry picked from commit 60efea19cc69377a86620e9c1006ee36d519575a)
---
.../compaction/AbstractTwoPhaseCompactor.java | 55 +++++++++++++++++++++-
.../apache/pulsar/compaction/CompactionTest.java | 32 +++++++++++++
2 files changed, 86 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 7aba181cb44..bdcdb50eb5f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -38,6 +39,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -45,6 +47,7 @@ import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +65,9 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
@VisibleForTesting
static Runnable injectionAfterSeekInPhaseTwo = () -> {};
+ @VisibleForTesting
+ static BiFunction<RawReader, MessageId, CompletableFuture<Void>>
injectionPhaseTwoSeek =
+ RawReader::seekAsync;
private static final Logger log =
LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class);
protected static final int MAX_OUTSTANDING = 500;
protected final Duration phaseOneLoopReadTimeout;
@@ -190,7 +196,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
LedgerHandle ledger) {
CompletableFuture<Long> promise = new CompletableFuture<>();
- reader.seekAsync(from).thenCompose((v) -> {
+ phaseTwoSeekWithRetry(reader, from).thenCompose((v) -> {
injectionAfterSeekInPhaseTwo.run();
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
@@ -215,6 +221,53 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
return promise;
}
+ /**
+ * Seek the compaction subscription to {@code from}, retrying on transient
+ * {@link PulsarClientException.ConnectException}.
+ *
+ * <p>Server-side, {@code PersistentSubscription.resetCursorInternal}
disconnects the compaction
+ * consumer before resetting the managed cursor, then sends the success
response. This races with
+ * the client: {@code channelInactive} fires on the consumer's {@code
ClientCnx} and fails the
+ * in-flight seek future with {@code ConnectException} before the broker's
success response
+ * arrives. The seek is idempotent and the cursor is already repositioned
server-side, so
+ * retrying after a short backoff lets the client reconnect and the next
seek complete normally.
+ * Non-transient failures propagate immediately.
+ */
+ private CompletableFuture<Void> phaseTwoSeekWithRetry(RawReader reader,
MessageId from) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(1))
+ .mandatoryStop(Duration.ofSeconds(10))
+ .build();
+ attemptPhaseTwoSeek(reader, from, backoff, promise);
+ return promise;
+ }
+
+ private void attemptPhaseTwoSeek(RawReader reader, MessageId from, Backoff
backoff,
+ CompletableFuture<Void> promise) {
+ injectionPhaseTwoSeek.apply(reader, from).whenComplete((v, ex) -> {
+ if (ex == null) {
+ promise.complete(null);
+ return;
+ }
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (!(cause instanceof PulsarClientException.ConnectException)) {
+ promise.completeExceptionally(cause);
+ return;
+ }
+ long nextMs = backoff.next().toMillis();
+ if (backoff.isMandatoryStopMade()) {
+ promise.completeExceptionally(cause);
+ return;
+ }
+ log.warn("Phase two seek failed transiently, will retry. topic={}
from={} nextMs={}",
+ reader.getTopic(), from, nextMs, cause);
+ scheduler.schedule(() -> attemptPhaseTwoSeek(reader, from, backoff,
promise),
+ nextMs, TimeUnit.MILLISECONDS);
+ });
+ }
+
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
MessageId lastCompactedMessageId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 2f32a72d319..5fc6416ee98 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import lombok.Cleanup;
@@ -91,6 +92,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -161,6 +163,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
public void beforeMethod() throws Exception {
admin.namespaces().removeRetention("my-tenant/my-ns");
AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
}
protected long compact(String topic) throws ExecutionException,
InterruptedException {
@@ -2518,6 +2521,35 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
"value-2", "key-2", "value-0", "key-2", "value-1"));
}
+ @Test
+ public void testPhaseTwoSeekRetriesOnConnectException() throws Exception {
+ final var topic = "persistent://my-tenant/my-ns/phase-two-seek-retry";
+ @Cleanup final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ producer.newMessage().key("k").value("v0").send();
+ producer.newMessage().key("k").value("v1").send();
+
+ // Simulate the production race:
PersistentSubscription.resetCursorInternal disconnects the
+ // consumer before responding to the seek, which fails the client's
in-flight seek future
+ // with ConnectException. The first attempt here returns a synthetic
failure without invoking
+ // the real seek (so nothing is in flight on the underlying
ConsumerImpl); the retry calls
+ // seekAsync for real and the compaction proceeds.
+ final var attempts = new AtomicInteger(0);
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = (reader, msgId) -> {
+ if (attempts.getAndIncrement() == 0) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.ConnectException("simulated
disconnect during seek"));
+ }
+ return reader.seekAsync(msgId);
+ };
+
+ final long compactedLedgerId = compact(topic);
+ assertNotEquals(compactedLedgerId, -1L);
+ assertTrue(attempts.get() >= 2,
+ "Seek should have been retried at least once, attempts=" +
attempts.get());
+
+ verifyReadKeyValues(topic, true, List.of("k", "v1"));
+ }
+
private void verifyReadKeyValues(String topic, boolean readCompacted,
List<String> expectedKeyValues)
throws Exception {
@Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)