ashwindakappagari opened a new issue, #25936: URL: https://github.com/apache/pulsar/issues/25936
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Pulsar broker version (where the bug was first observed): - Apache Pulsar 2.11.2 (internal custom build with broker-side plugins — the affected pulsar-functions/worker code path is the unmodified upstream version, no Boomi changes there) - Linux on Kubernetes (Amazon EKS, eu-central-1) - Amazon Linux 2 base, x86_64 - OpenJDK 17 (Amazon Corretto 17.0.2) Pulsar broker version (verified the code path is unchanged on): - master at HEAD (commit f638a892d5, 2026-06-04) — confirmed by reading pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java. The catch block in createExclusiveProducerWithRetry has no future disposal after the SLF4J → slog migration (#25508) - branch-3.0, branch-3.3, branch-4.0 — same pattern (unchanged by #24539, which fixed the InterruptedException path in ProducerBuilderImpl rather than the TimeoutException path in WorkerUtils) - branch-2.11 — same pattern (EOL per release policy; not requesting a fix here) Pulsar client library: - pulsar-client-java embedded in pulsar-functions-worker (same Pulsar version as the broker — the function-worker process constructs its PulsarClient against the local broker) Client OS / hardware / Java: - Same as broker: Linux on EKS, Amazon Linux 2, x86_64, OpenJDK 17 (Corretto 17.0.2). Workers and brokers run as separate StatefulSets on the same Kubernetes cluster. Deployment shape: - 3 broker pods, 4 function-worker pods, both as Kubernetes StatefulSets deployed via an Apache Pulsar Helm chart derivative - brokerDeduplicationEnabled is unset on persistent://functions/internal/* topics, so they're created with dedup disabled (confirmed in broker logs: BrokerService: Created topic ... - dedup is disabled). This is the upstream default; relevant because it eliminates the inactivity-based reaper as a recovery path for the orphan producer. ### Issue Description Root cause (from source analysis) WorkerUtils.createExclusiveProducerWithRetry at pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java (lines 415-444 in 2.11.2; structurally identical on master) bounds producer creation with createAsync().get(10, TimeUnit.SECONDS). When get() throws TimeoutException, the catch block logs and returns to the retry loop. The in-flight CompletableFuture<Producer> is neither cancelled nor disposed. The underlying ProducerImpl state machine continues to run on the PulsarClient IO thread. If the producer eventually registers with the broker (DNS resolves, broker accepts the create), it does so as an orphan — registered with the broker, but no caller holds a reference. Every subsequent retry attempts to register a fresh ProducerImpl with the same exclusive producer name and is rejected. Note that CompletableFuture.cancel(true) would not be sufficient — per its Javadoc, "the mayInterruptIfRunning parameter has no effect in this implementation". The producer state machine doesn't observe cancellation; cancel() only updates the future's terminal state. The orphan still registers. The orphan does not reap because: 1. The client never sends CloseProducer (no caller reference exists to invoke close()) 2. The TCP connection stays alive (shared with other producers/consumers on the same PulsarClient, e.g., the worker's assignment-tailer Reader) 3. The internal topics used by Pulsar Functions are created with brokerDeduplicationEnabled=false (verified in our broker logs: BrokerService: Created topic persistent://functions/internal/assignments - dedup is disabled), so the broker's inactivity-based producer reaper does not apply Relationship to existing PRs Two recent PRs addressed adjacent slices of the orphan-producer problem space: - #24539 ("Close orphan producer or consumer when the creation is interrupted", merged 2025-07, in 4.1.0, backported to 3.0/3.3/4.0) — fixes the orphan on InterruptedException. Introduces what looks like a reusable FutureUtil helper in pulsar-client for cleanup-on-completion. Does not cover TimeoutException. - #23853 ("Orphan producer when concurrently calling producer closing and reconnection", merged 2025-01, in 4.1.0) — fixes a race during normal reconnect/close. Different scenario from this one. This issue extends the orphan-producer fix surface from #24539 to cover the timeout path in pulsar-functions/worker. Possible paths I'd want maintainer input on before writing a PR: 1. Reuse FutureUtil from #24539 — if the helper from #24539 is generic enough to handle the timeout case, the fix in WorkerUtils would be a one-line conversion. Cleaner, but I'd want to confirm the helper API fits. 2. Inline whenComplete cleanup in WorkerUtils — the catch block attaches a callback that calls closeAsync() if the future eventually resolves with a producer. Diff is ~5 lines plus comment. 3. Push the disposal logic into ProducerBuilderImpl/ProducerImpl itself — broader change, but would prevent the bug class showing up in any createAsync().get(timeout) caller. More invasive; likely requires a PIP. ### Error messages ```text We observed this in production on Pulsar 2.11.2 on 2026-05-23. DNS resolution to the new bundle owner lagged ~12 seconds during a broker StatefulSet rolling restart on Kubernetes. The new leader was stuck for ~1 hour until manual kubectl rollout restart of the functions-worker StatefulSet. Internal RCA evidence available on request. Concrete log fingerprint (with worker-specific identifiers redacted): Broker side: INFO o.a.p.b.s.BrokerService - Created topic persistent://functions/internal/assignments - dedup is disabled INFO o.a.p.b.s.p.PersistentTopic - [persistent://functions/internal/assignments] Updated topic epoch to 1 INFO o.a.p.b.s.ServerCnx - [/<worker-ip>] Created new producer: Producer{ topic=persistent://functions/internal/assignments, producerName=<workerId>-scheduler-manager, producerId=0} ( ... no `Removed producer` for the lifetime of the worker JVM ... ) Worker side: INFO ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {producerName=<workerId>-scheduler-manager, ...} <- Producer #1 ( ... DNS resolution failures for several seconds ... ) INFO ProducerImpl - [<workerId>-scheduler-manager] Created producer on cnx [id: ...] Topic has an existing exclusive producer: <workerId>-scheduler-manager ( ... repeats every 10s, matching sleepInBetweenMs=10000 ... ) The two distinct Starting Pulsar producer perf lines — each from a separate ProducerImpl constructor — combined with the fact that WorkerUtils.createExclusiveProducerWithRetry only retries on exception, prove the first createAsync().get(10s) threw TimeoutException. The successful Created producer on cnx line in between (from the IO thread) proves the broker accepted the first registration. That's the orphan. ``` ### Reproducing the issue Minimal reproduce step The bug surfaces during a function-worker leader handoff when the broker that owns the assignments-topic bundle becomes unavailable AND the new bundle owner has DNS resolution lag exceeding 10 seconds (the hard-coded timeout in WorkerUtils.createExclusiveProducerWithRetry). Synthetic reproduction (no production cluster needed): 1. Deploy a 3-broker, 3-worker Pulsar Functions cluster (e.g., via apache/pulsar-helm-chart on kind/minikube). 2. Identify which broker currently owns persistent://functions/internal/assignments, and which worker is the leader (via the participants Failover subscription on persistent://functions/internal/coordinate). 3. Add a network delay rule that fails DNS resolution for the broker pod that owns the assignments bundle for ~15 seconds. Tools that work: - Toxiproxy with a latency toxic on the DNS port - tc qdisc add ... delay 15000ms on the broker pod's veth - A CoreDNS plugin temporarily blocking the headless-service A record 4. Delete the broker pod that owns the bundle, forcing it to migrate to the broker affected by the DNS delay. 5. Tail the new leader worker's logs. What did you expect to see? After at most a few retries, the new leader successfully acquires its <workerId>-scheduler-manager exclusive producer on persistent://functions/internal/assignments, and can publish assignment updates. What did you see instead? The new leader logs ProducerFencedException: Topic has an existing exclusive producer: <workerId>-scheduler-manager every 10 seconds indefinitely. The producer name in the error message is the leader's own producer. The cascade persists until the function-worker process is restarted. ### Additional information _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
