This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cbe23a31d8d [fix](job) fix NPE in routine load Kafka meta request
(#63180)
cbe23a31d8d is described below
commit cbe23a31d8d37f4e25bcc40e945d6559299903b7
Author: hui lai <[email protected]>
AuthorDate: Fri May 22 11:16:12 2026 +0800
[fix](job) fix NPE in routine load Kafka meta request (#63180)
### What problem does this PR solve?
Problem Summary:
In a single-BE deployment, Kafka routine load fetches topic metadata
through the only BE. If that BE cannot connect to Kafka, the metadata
request fails and the BE is skipped in the current retry loop. Then FE
may have no normal candidate backend left and falls back to backend ids
in the routine load blacklist.
The blacklist can contain stale backend ids that no longer exist in
`SystemInfoService`. In that case, `KafkaUtil` may get a null `Backend`
and throw a NullPointerException when calling `be.getHost()`. This hides
the real Kafka metadata error, such as broker connection failure.
This PR filters stale backend ids when reading the routine load
blacklist and adds a final null check before creating the BE address.
The original Kafka metadata error is preserved instead of being replaced
by the secondary NPE.
A regression case is added with an invalid `kafka_broker_list` to verify
that routine load reports the expected Kafka metadata error path.
---
.../apache/doris/datasource/kafka/KafkaUtil.java | 23 +++++++--
.../load_p0/routine_load/test_black_list.groovy | 56 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index a097f052aa9..00fd28c88da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -252,8 +252,16 @@ public class KafkaUtil {
// 2. If that sole backend is decommissioned, the
aliveBackends list becomes empty.
// Hence, in such cases, it's essential to rely on the
blacklist to obtain meta information.
if (backendIds.isEmpty()) {
- for (Long beId :
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
- backendIds.add(beId);
+ Map<Long, Long> blacklist =
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist();
+ for (Long beId : blacklist.keySet()) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend != null) {
+ backendIds.add(beId);
+ } else {
+ blacklist.remove(beId);
+ LOG.warn("remove stale backend {} from routine
load blacklist when getting kafka meta",
+ beId);
+ }
}
}
if (backendIds.isEmpty()) {
@@ -264,7 +272,16 @@ public class KafkaUtil {
throw new LoadException("failed to get info: " + errorMsg
+ ",");
}
Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ long selectedBeId = backendIds.get(0);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(selectedBeId);
+ if (be == null) {
+ if (errorMsg == null) {
+ errorMsg = "backend " + selectedBeId + " does not
exist";
+ }
+ LOG.warn("skip stale backend {} when getting kafka meta",
selectedBeId);
+ retryTimes++;
+ continue;
+ }
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
long beId = be.getId();
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index a68ced241d6..84c1a925caf 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -150,5 +150,61 @@ suite("test_black_list","nonConcurrent,p0") {
GetDebugPoint().disableDebugPointForAllBEs(inject)
sql "stop routine load for ${job}"
}
+
+ def invalidBrokerTableName = "test_black_list_invalid_broker"
+ def invalidBrokerJob = "test_black_list_invalid_broker_job"
+ sql """ DROP TABLE IF EXISTS ${invalidBrokerTableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${invalidBrokerTableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${invalidBrokerJob} ON
${invalidBrokerTableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "127.0.0.1:1",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def state = sql "show routine load for ${invalidBrokerJob}"
+ def stateChangedReason = state[0][17].toString()
+ def otherMsg = state[0][19].toString()
+ def errorMsg = "${stateChangedReason} ${otherMsg}"
+ log.info("invalid broker routine load state:
${state[0][8].toString()}".toString())
+ log.info("invalid broker reason of state changed:
${stateChangedReason}".toString())
+ log.info("invalid broker other msg: ${otherMsg}".toString())
+ if (errorMsg.contains("Failed to get all partitions of kafka
topic")) {
+ assertTrue(errorMsg.contains("failed to get info"))
+ assertTrue(errorMsg.contains("failed to get partition
meta: Local: Broker transport failure"))
+ break
+ }
+ if (count >= 90) {
+ log.error("routine load invalid broker test fail")
+ assertEquals(1, 2)
+ break
+ }
+ count++
+ }
+ } finally {
+ try_sql "stop routine load for ${invalidBrokerJob}"
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]