Pavel Zeger created KAFKA-20602:
-----------------------------------
Summary: DLQ topic creation has 3 issues: indefinite hang, wrong
log level, interrupt not restored
Key: KAFKA-20602
URL: https://issues.apache.org/jira/browse/KAFKA-20602
Project: Kafka
Issue Type: Bug
Components: connect
Reporter: Pavel Zeger
`DeadLetterQueueReporter.createAndSetup` can hang indefinitely on broker
outage, logs expected DLQ creation as ERROR, and does not restore interrupt
status
*Where*
`connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java`,
lines 77-98:
{code:java}
public static DeadLetterQueueReporter createAndSetup(Map<String, Object>
adminProps,
ConnectorTaskId id,
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps,
ErrorHandlingMetrics
errorHandlingMetrics) { {code}
{*}Issue 1{*}: indefinite hang on broker outage
Both `admin.listTopics().names().get()` and
`admin.createTopics(...).all().get()` are unbounded `.get()` calls. If the
broker is unreachable:
- The `AdminClient` retries internally until its own `default.api.timeout.ms`
(default **60 seconds**), then `.get()` returns the failure as
`ExecutionException`.
- But the `try-with-resources` already started — the `Admin.create()` itself
blocks on the producer's bootstrap.
So on cold-broker startup, **every connector that has DLQ enabled
blocks for ~60s before its task starts**. For a worker with N such connectors,
this serializes — N × 60s startup delay.
There's a documented Connect best practice of "set short
admin.default.api.timeout.ms for fast startup," but the `adminProps` map here
is constructed from sink config and may not
include that override. Add an explicit `.get(timeout, unit)` to upper-bound the
wait regardless:
{code:java}
admin.listTopics().names().get(30, TimeUnit.SECONDS); {code}
{*}Issue 2{*}: `log.error` for an expected condition
{code:java}
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
{code}
This isn't an error. The next line creates the topic. The log message even says
"will attempt to create" - the code is *expecting* the topic to not exist on
first startup. Should be `log.info`. This shows up in operator dashboards as
"ERROR" alerts on every
first start of a connector with DLQ enabled. Pure noise.
{*}Issue 3{*}: `InterruptedException` thrown without restoring flag
{code:java}
} catch (InterruptedException e) {
thrownew ConnectException("Could not initialize dead letter queue with topic="
+ topic, e);
} {code}
The Java idiom: when you catch `InterruptedException` and don't re-throw it
as-is, you must call `Thread.currentThread().interrupt()` to preserve the
signal for upstream code. Currently the `ConnectException` propagates but the
interrupt flag is cleared,
so any subsequent blocking call on this thread doesn't know it was interrupted.
*Proposed fix*
Four changes:
1. Bounded `.get(timeoutMs, ...)` on both admin calls.
2. New explicit `catch (TimeoutException e)` for the timeout case.
3. `log.error` → `log.info`, and include the topic config in the
message.
4. Restore the interrupt flag before re-throwing.
*KIP needed?*
No
--
This message was sent by Atlassian Jira
(v8.20.10#820010)