andrewt-8x8 opened a new issue, #25290:
URL: https://github.com/apache/pulsar/issues/25290

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   ### User environment
   
   - Broker version: 4.0.6
   - Client library type: Java (Kafka Source Connector / `pulsar-io-kafka`)
   - Deployed on Kubernetes via Function Mesh
   
   ### Issue Description
   
   When the Kafka Source connector's consumer thread encounters a fatal 
exception (e.g. due to a transient Pulsar authentication outage), the error 
notification is silently lost and the connector pod remains alive but 
permanently idle.
   
   **What happens:**
   
   1. The consumer thread in `KafkaAbstractSource.start()` catches an exception 
at [KafkaAbstractSource.java 
L192-L195](https://github.com/apache/pulsar/blob/v4.0.6/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java#L192-L195):
      ```java
      } catch (Exception e) {
          LOG.error("Error while processing records", e);
          notifyError(e);
          break;  // consumer thread exits permanently
      }
      ```
   2. `notifyError(e)` (in `AbstractPushSource`) enqueues an 
`ErrorNotifierRecord` onto a bounded `LinkedBlockingQueue(1000)` via 
`consume()` -> `queue.put()`.
   3. The intent is that the instance thread dequeues it via `readNext()`, 
which throws the exception, causing the function framework to restart the 
instance.
   
   **The bug:** If the instance thread is blocked in `sendOutputMessage()` 
(e.g. when Pulsar client cannot contact an external authz service because of a 
Cloudflare outage 🤦), it never returns to call `readNext()`. The 
`ErrorNotifierRecord` sits in the queue (or `queue.put()` blocks if the queue 
is full), and the error is never propagated. The consumer thread is dead, the 
instance thread is stuck, but the pod passes liveness/readiness checks - it's 
alive but doing no work.
   
   **What was expected:** The connector should crash (or the instance should 
restart) so that Kubernetes can restart the pod and recovery can happen 
automatically once the transient outage resolves.
   
   **Why this is a bug:** The `notifyError()` mechanism added in PR #20791 / 
PIP-281 relies on the instance thread being in a state where it will call 
`readNext()`. When the instance thread is blocked on I/O (sending to Pulsar), 
this assumption is violated and the error signal is silently dropped. This is a 
liveness bug - the connector appears healthy but is permanently stalled.
   
   ### Error messages
   
   No error messages beyond the initial `LOG.error("Error while processing 
records", e)` on the consumer thread. The instance thread produces no output 
because it is blocked.
   
   ### Reproducing the issue
   
   1. Deploy a Kafka Source connector (e.g. `KafkaBytesSource`) on Kubernetes 
via Function Mesh
   2. Ensure the connector is actively consuming from Kafka and producing to 
Pulsar
   3. Cause a transient Pulsar authentication failure (e.g. make the SSO/OAuth 
token endpoint return 500s)
   4. The consumer thread will eventually hit an exception when 
`CompletableFuture.allOf(futures).get()` times out (because messages can't be 
sent to Pulsar), call `notifyError()`, and break
   5. The instance thread is blocked in `sendOutputMessage()` trying to write a 
previously-consumed record to the unauthenticated Pulsar client
   6. Observe that the pod remains running, passes health checks, but consumes 
no further messages from Kafka
   7. Even after the auth outage recovers, consumption does not resume - the 
consumer thread is dead
   
   ### Additional information
   
   **Related issues and PRs:**
   - Discussion #19880 - "Should Kafka Source Connector itself after 
unrecoverable error?"
   - PR #20424 - `[fix][io] Close the kafka source connector if there is 
uncaught exception` (merged, 3.1.0)
   - PR #20698 - `[fix][io] Close the kafka source connector got stuck` 
(merged, 3.1.0)
   - PR #20795 - `[fix][io] Not restart instance when kafka source poll 
exception` (merged, 3.1.0) - this PR introduced the `notifyError()` + `break` 
pattern
   - PR #20791 / PR #20807 (PIP-281) - Added `notifyError()` to `PushSource` / 
`AbstractPushSource`
   - PR #22511 - `[fix][io] Kafka Source connector maybe stuck` (merged, 3.3.0) 
- added timeout on futures, but doesn't address this scenario
   
   The fixes in 3.1.0–3.3.0 addressed the case where the consumer thread itself 
gets stuck, but did not address the case where the consumer thread correctly 
signals an error via `notifyError()` but the instance thread is blocked and 
never reads it.
   
   **Possible fix directions:**
   - Have `notifyError()` call `System.exit(1)` to force a pod restart (this is 
what we've done as a downstream workaround)
   - Have `KafkaAbstractSource` catch the exception and call `close()` on a 
separate thread with a timeout, then `System.exit(1)` if close doesn't complete
   - Have `notifyError()` throw a `RuntimeException` instead of (or in addition 
to) enqueuing - however, since `notifyError()` runs on the consumer thread, an 
uncaught exception would only kill that thread (which is already dying via 
`break`); it would not affect the stuck instance thread unless combined with a 
`Thread.UncaughtExceptionHandler` that calls `System.exit(1)`
   - Add a watchdog/heartbeat mechanism to detect that the consumer thread has 
died
   
   ### Are you willing to submit a PR?
   
   - [ ] Not at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to