This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60efea19cc6 [fix][broker] Race condition causes perpetual backlog on
internal topics (#25572)
60efea19cc6 is described below
commit 60efea19cc69377a86620e9c1006ee36d519575a
Author: Darin Spivey <[email protected]>
AuthorDate: Fri May 1 11:58:47 2026 -0400
[fix][broker] Race condition causes perpetual backlog on internal topics
(#25572)
Signed-off-by: Darin Spivey <[email protected]>
Co-authored-by: Darin Spivey <[email protected]>
---
.../compaction/AbstractTwoPhaseCompactor.java | 59 +++++++++++++++++++++-
.../apache/pulsar/compaction/CompactionTest.java | 32 ++++++++++++
2 files changed, 90 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 6139bfb8802..390bd9a0ce0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import lombok.CustomLog;
import org.apache.bookkeeper.client.BKException;
@@ -39,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -46,6 +48,7 @@ import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
/**
@@ -62,6 +65,9 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
@VisibleForTesting
static Runnable injectionAfterSeekInPhaseTwo = () -> {};
+ @VisibleForTesting
+ static BiFunction<RawReader, MessageId, CompletableFuture<Void>>
injectionPhaseTwoSeek =
+ RawReader::seekAsync;
protected static final int MAX_OUTSTANDING = 500;
protected final Duration phaseOneLoopReadTimeout;
protected final boolean topicCompactionRetainNullKey;
@@ -197,7 +203,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
LedgerHandle ledger) {
CompletableFuture<Long> promise = new CompletableFuture<>();
- reader.seekAsync(from).thenCompose((v) -> {
+ phaseTwoSeekWithRetry(reader, from).thenCompose((v) -> {
injectionAfterSeekInPhaseTwo.run();
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
@@ -222,6 +228,57 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
return promise;
}
+ /**
+ * Seek the compaction subscription to {@code from}, retrying on transient
+ * {@link PulsarClientException.ConnectException}.
+ *
+ * <p>Server-side, {@code PersistentSubscription.resetCursorInternal}
disconnects the compaction
+ * consumer before resetting the managed cursor, then sends the success
response. This races with
+ * the client: {@code channelInactive} fires on the consumer's {@code
ClientCnx} and fails the
+ * in-flight seek future with {@code ConnectException} before the broker's
success response
+ * arrives. The seek is idempotent and the cursor is already repositioned
server-side, so
+ * retrying after a short backoff lets the client reconnect and the next
seek complete normally.
+ * Non-transient failures propagate immediately.
+ */
+ private CompletableFuture<Void> phaseTwoSeekWithRetry(RawReader reader,
MessageId from) {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ Backoff backoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(1))
+ .mandatoryStop(Duration.ofSeconds(10))
+ .build();
+ attemptPhaseTwoSeek(reader, from, backoff, promise);
+ return promise;
+ }
+
+ private void attemptPhaseTwoSeek(RawReader reader, MessageId from, Backoff
backoff,
+ CompletableFuture<Void> promise) {
+ injectionPhaseTwoSeek.apply(reader, from).whenComplete((v, ex) -> {
+ if (ex == null) {
+ promise.complete(null);
+ return;
+ }
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (!(cause instanceof PulsarClientException.ConnectException)) {
+ promise.completeExceptionally(cause);
+ return;
+ }
+ long nextMs = backoff.next().toMillis();
+ if (backoff.isMandatoryStopMade()) {
+ promise.completeExceptionally(cause);
+ return;
+ }
+ log.warn()
+ .attr("topic", reader.getTopic())
+ .attr("from", from)
+ .attr("nextMs", nextMs)
+ .exceptionMessage(cause)
+ .log("Phase two seek failed transiently, will retry");
+ scheduler.schedule(() -> attemptPhaseTwoSeek(reader, from, backoff,
promise),
+ nextMs, TimeUnit.MILLISECONDS);
+ });
+ }
+
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String,
MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
MessageId lastCompactedMessageId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 4e839362055..99f3ce40409 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import lombok.Cleanup;
@@ -91,6 +92,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -161,6 +163,7 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
public void beforeMethod() throws Exception {
admin.namespaces().removeRetention("my-tenant/my-ns");
AbstractTwoPhaseCompactor.injectionAfterSeekInPhaseTwo = () -> {};
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = RawReader::seekAsync;
}
protected long compact(String topic) throws ExecutionException,
InterruptedException {
@@ -2523,6 +2526,35 @@ public class CompactionTest extends
MockedPulsarServiceBaseTest {
"value-2", "key-2", "value-0", "key-2", "value-1"));
}
+ @Test
+ public void testPhaseTwoSeekRetriesOnConnectException() throws Exception {
+ final var topic = "persistent://my-tenant/my-ns/phase-two-seek-retry";
+ @Cleanup final var producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ producer.newMessage().key("k").value("v0").send();
+ producer.newMessage().key("k").value("v1").send();
+
+ // Simulate the production race:
PersistentSubscription.resetCursorInternal disconnects the
+ // consumer before responding to the seek, which fails the client's
in-flight seek future
+ // with ConnectException. The first attempt here returns a synthetic
failure without invoking
+ // the real seek (so nothing is in flight on the underlying
ConsumerImpl); the retry calls
+ // seekAsync for real and the compaction proceeds.
+ final var attempts = new AtomicInteger(0);
+ AbstractTwoPhaseCompactor.injectionPhaseTwoSeek = (reader, msgId) -> {
+ if (attempts.getAndIncrement() == 0) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.ConnectException("simulated
disconnect during seek"));
+ }
+ return reader.seekAsync(msgId);
+ };
+
+ final long compactedLedgerId = compact(topic);
+ assertNotEquals(compactedLedgerId, -1L);
+ assertTrue(attempts.get() >= 2,
+ "Seek should have been retried at least once, attempts=" +
attempts.get());
+
+ verifyReadKeyValues(topic, true, List.of("k", "v1"));
+ }
+
private void verifyReadKeyValues(String topic, boolean readCompacted,
List<String> expectedKeyValues)
throws Exception {
@Cleanup final var reader =
pulsarClient.newReader(Schema.STRING).topic(topic).readCompacted(readCompacted)