Pavel Zeger created KAFKA-20602:
-----------------------------------

             Summary: DLQ topic creation has 3 issues: indefinite hang, wrong 
log level, interrupt not restored
                 Key: KAFKA-20602
                 URL: https://issues.apache.org/jira/browse/KAFKA-20602
             Project: Kafka
          Issue Type: Bug
          Components: connect
            Reporter: Pavel Zeger


`DeadLetterQueueReporter.createAndSetup` can hang indefinitely on broker 
outage, logs expected DLQ creation as ERROR, and does not restore interrupt 
status

*Where*

`connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java`,
 lines 77-98:
 
{code:java}
public static DeadLetterQueueReporter createAndSetup(Map<String, Object> 
adminProps,                                                        
ConnectorTaskId id,                                                        
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,              
                                          ErrorHandlingMetrics 
errorHandlingMetrics) { {code}


{*}Issue 1{*}: indefinite hang on broker outage

Both `admin.listTopics().names().get()` and 
`admin.createTopics(...).all().get()` are unbounded `.get()` calls. If the 
broker is unreachable:
- The `AdminClient` retries internally until its own `default.api.timeout.ms` 
(default **60 seconds**), then `.get()` returns the failure as 
`ExecutionException`.
- But the `try-with-resources` already started — the `Admin.create()` itself 
blocks on the producer's bootstrap.

So on cold-broker startup, **every connector that has DLQ enabled
blocks for ~60s before its task starts**. For a worker with N such connectors, 
this serializes — N × 60s startup delay.

There's a documented Connect best practice of "set short 
admin.default.api.timeout.ms for fast startup," but the `adminProps` map here 
is constructed from sink config and may not
include that override. Add an explicit `.get(timeout, unit)` to upper-bound the 
wait regardless:
{code:java}
admin.listTopics().names().get(30, TimeUnit.SECONDS); {code}
{*}Issue 2{*}: `log.error` for an expected condition
{code:java}
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); 
{code}
This isn't an error. The next line creates the topic. The log message even says 
"will attempt to create" - the code is *expecting* the topic to not exist on 
first startup. Should be `log.info`. This shows up in operator dashboards as 
"ERROR" alerts on every
first start of a connector with DLQ enabled. Pure noise.
 
{*}Issue 3{*}: `InterruptedException` thrown without restoring flag
{code:java}
} catch (InterruptedException e) {
thrownew ConnectException("Could not initialize dead letter queue with topic=" 
+ topic, e);
} {code}


The Java idiom: when you catch `InterruptedException` and don't re-throw it 
as-is, you must call `Thread.currentThread().interrupt()` to preserve the 
signal for upstream code. Currently the `ConnectException` propagates but the 
interrupt flag is cleared,
so any subsequent blocking call on this thread doesn't know it was interrupted.
 
*Proposed fix*

Four changes:

1. Bounded `.get(timeoutMs, ...)` on both admin calls.
2. New explicit `catch (TimeoutException e)` for the timeout case.
3. `log.error` → `log.info`, and include the topic config in the
message.
4. Restore the interrupt flag before re-throwing.

*KIP needed?*

No



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to