ashwindakappagari opened a new issue, #25936:
URL: https://github.com/apache/pulsar/issues/25936

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
     Pulsar broker version (where the bug was first observed):
   
     - Apache Pulsar 2.11.2 (internal custom build with broker-side plugins — 
the affected pulsar-functions/worker code path is the unmodified upstream 
version, no Boomi changes there)
     - Linux on Kubernetes (Amazon EKS, eu-central-1)
     - Amazon Linux 2 base, x86_64
     - OpenJDK 17 (Amazon Corretto 17.0.2)
   
     Pulsar broker version (verified the code path is unchanged on):
   
     - master at HEAD (commit f638a892d5, 2026-06-04) — confirmed by reading 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java.
 The catch block in
     createExclusiveProducerWithRetry has no future disposal after the SLF4J → 
slog migration (#25508)
     - branch-3.0, branch-3.3, branch-4.0 — same pattern (unchanged by #24539, 
which fixed the InterruptedException path in ProducerBuilderImpl rather than 
the TimeoutException path in WorkerUtils)
     - branch-2.11 — same pattern (EOL per release policy; not requesting a fix 
here)
   
     Pulsar client library:
   
     - pulsar-client-java embedded in pulsar-functions-worker (same Pulsar 
version as the broker — the function-worker process constructs its PulsarClient 
against the local broker)
   
     Client OS / hardware / Java:
   
     - Same as broker: Linux on EKS, Amazon Linux 2, x86_64, OpenJDK 17 
(Corretto 17.0.2). Workers and brokers run as separate StatefulSets on the same 
Kubernetes cluster.
   
     Deployment shape:
   
     - 3 broker pods, 4 function-worker pods, both as Kubernetes StatefulSets 
deployed via an Apache Pulsar Helm chart derivative
     - brokerDeduplicationEnabled is unset on persistent://functions/internal/* 
topics, so they're created with dedup disabled (confirmed in broker logs: 
BrokerService: Created topic ... - dedup is
     disabled). This is the upstream default; relevant because it eliminates 
the inactivity-based reaper as a recovery path for the orphan producer.
   
   
   ### Issue Description
   
     
   
     Root cause (from source analysis)
   
     WorkerUtils.createExclusiveProducerWithRetry at 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
 (lines 415-444 in 2.11.2; structurally identical on master)
     bounds producer creation with createAsync().get(10, TimeUnit.SECONDS). 
When get() throws TimeoutException, the catch block logs and returns to the 
retry loop. The in-flight
     CompletableFuture<Producer> is neither cancelled nor disposed.
   
     The underlying ProducerImpl state machine continues to run on the 
PulsarClient IO thread. If the producer eventually registers with the broker 
(DNS resolves, broker accepts the create), it does so
     as an orphan — registered with the broker, but no caller holds a 
reference. Every subsequent retry attempts to register a fresh ProducerImpl 
with the same exclusive producer name and is rejected.
   
     Note that CompletableFuture.cancel(true) would not be sufficient — per its 
Javadoc, "the mayInterruptIfRunning parameter has no effect in this 
implementation". The producer state machine doesn't
     observe cancellation; cancel() only updates the future's terminal state. 
The orphan still registers.
   
     The orphan does not reap because:
   
     1. The client never sends CloseProducer (no caller reference exists to 
invoke close())
     2. The TCP connection stays alive (shared with other producers/consumers 
on the same PulsarClient, e.g., the worker's assignment-tailer Reader)
     3. The internal topics used by Pulsar Functions are created with 
brokerDeduplicationEnabled=false (verified in our broker logs: BrokerService: 
Created topic
     persistent://functions/internal/assignments - dedup is disabled), so the 
broker's inactivity-based producer reaper does not apply
   
     Relationship to existing PRs
   
     Two recent PRs addressed adjacent slices of the orphan-producer problem 
space:
   
     - #24539 ("Close orphan producer or consumer when the creation is 
interrupted", merged 2025-07, in 4.1.0, backported to 3.0/3.3/4.0) — fixes the 
orphan on InterruptedException. Introduces what
     looks like a reusable FutureUtil helper in pulsar-client for 
cleanup-on-completion. Does not cover TimeoutException.
     - #23853 ("Orphan producer when concurrently calling producer closing and 
reconnection", merged 2025-01, in 4.1.0) — fixes a race during normal 
reconnect/close. Different scenario from this one.
   
     This issue extends the orphan-producer fix surface from #24539 to cover 
the timeout path in pulsar-functions/worker. Possible paths I'd want maintainer 
input on before writing a PR:
   
     1. Reuse FutureUtil from #24539 — if the helper from #24539 is generic 
enough to handle the timeout case, the fix in WorkerUtils would be a one-line 
conversion. Cleaner, but I'd want to confirm the
     helper API fits.
     2. Inline whenComplete cleanup in WorkerUtils — the catch block attaches a 
callback that calls closeAsync() if the future eventually resolves with a 
producer. Diff is ~5 lines plus comment.
     3. Push the disposal logic into ProducerBuilderImpl/ProducerImpl itself — 
broader change, but would prevent the bug class showing up in any 
createAsync().get(timeout) caller. More invasive; likely
     requires a PIP.
   
   
   
   
   
   ### Error messages
   
   ```text
   We observed this in production on Pulsar 2.11.2  on 2026-05-23. DNS 
resolution to the new bundle owner lagged ~12 seconds during a broker 
StatefulSet rolling restart on
     Kubernetes. The new leader was stuck for ~1 hour until manual kubectl 
rollout restart of the functions-worker StatefulSet. Internal RCA evidence 
available on request.
   
     Concrete log fingerprint (with worker-specific identifiers redacted):
   
     Broker side:
   
     INFO o.a.p.b.s.BrokerService - Created topic 
persistent://functions/internal/assignments - dedup is disabled
     INFO o.a.p.b.s.p.PersistentTopic - 
[persistent://functions/internal/assignments] Updated topic epoch to 1
     INFO o.a.p.b.s.ServerCnx - [/<worker-ip>] Created new producer: Producer{
             topic=persistent://functions/internal/assignments,
             producerName=<workerId>-scheduler-manager,
             producerId=0}
     ( ... no `Removed producer` for the lifetime of the worker JVM ... )
   
     Worker side:
   
     INFO ProducerStatsRecorderImpl - Starting Pulsar producer perf with config:
             {producerName=<workerId>-scheduler-manager, ...}        <- 
Producer #1
     ( ... DNS resolution failures for several seconds ... )
     INFO ProducerImpl - [<workerId>-scheduler-manager] Created producer on cnx 
[id: ...]
             Topic has an existing exclusive producer: 
<workerId>-scheduler-manager
     ( ... repeats every 10s, matching sleepInBetweenMs=10000 ... )
   
     The two distinct Starting Pulsar producer perf lines — each from a 
separate ProducerImpl constructor — combined with the fact that 
WorkerUtils.createExclusiveProducerWithRetry only retries on
     exception, prove the first createAsync().get(10s) threw TimeoutException. 
The successful Created producer on cnx line in between (from the IO thread) 
proves the broker accepted the first
     registration. That's the orphan.
   ```
   
   ### Reproducing the issue
   
   Minimal reproduce step
   
     The bug surfaces during a function-worker leader handoff when the broker 
that owns the assignments-topic bundle becomes unavailable AND the new bundle 
owner has DNS resolution lag exceeding 10
     seconds (the hard-coded timeout in 
WorkerUtils.createExclusiveProducerWithRetry).
   
     Synthetic reproduction (no production cluster needed):
   
     1. Deploy a 3-broker, 3-worker Pulsar Functions cluster (e.g., via 
apache/pulsar-helm-chart on kind/minikube).
     2. Identify which broker currently owns 
persistent://functions/internal/assignments, and which worker is the leader 
(via the participants Failover subscription on
     persistent://functions/internal/coordinate).
     3. Add a network delay rule that fails DNS resolution for the broker pod 
that owns the assignments bundle for ~15 seconds. Tools that work:
       - Toxiproxy with a latency toxic on the DNS port
       - tc qdisc add ... delay 15000ms on the broker pod's veth
       - A CoreDNS plugin temporarily blocking the headless-service A record
     4. Delete the broker pod that owns the bundle, forcing it to migrate to 
the broker affected by the DNS delay.
     5. Tail the new leader worker's logs.
   
     What did you expect to see?
   
     After at most a few retries, the new leader successfully acquires its 
<workerId>-scheduler-manager exclusive producer on 
persistent://functions/internal/assignments, and can publish assignment
     updates.
   
     What did you see instead?
   
     The new leader logs ProducerFencedException: Topic has an existing 
exclusive producer: <workerId>-scheduler-manager every 10 seconds indefinitely. 
The producer name in the error message is the
     leader's own producer. The cascade persists until the function-worker 
process is restarted.
   
   ### Additional information
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to