This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8616265b399 branch-4.1: [fix](job) fix NPE in routine load Kafka meta
request #63180 (#63511)
8616265b399 is described below
commit 8616265b399d8565d003ad5888f8a2677a01508a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 22:49:28 2026 +0800
branch-4.1: [fix](job) fix NPE in routine load Kafka meta request #63180
(#63511)
Cherry-picked from #63180
Co-authored-by: hui lai <[email protected]>
---
.../apache/doris/datasource/kafka/KafkaUtil.java | 23 +++++++--
.../load_p0/routine_load/test_black_list.groovy | 56 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index a097f052aa9..00fd28c88da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -252,8 +252,16 @@ public class KafkaUtil {
// 2. If that sole backend is decommissioned, the
aliveBackends list becomes empty.
// Hence, in such cases, it's essential to rely on the
blacklist to obtain meta information.
if (backendIds.isEmpty()) {
- for (Long beId :
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
- backendIds.add(beId);
+ Map<Long, Long> blacklist =
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist();
+ for (Long beId : blacklist.keySet()) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend != null) {
+ backendIds.add(beId);
+ } else {
+ blacklist.remove(beId);
+ LOG.warn("remove stale backend {} from routine
load blacklist when getting kafka meta",
+ beId);
+ }
}
}
if (backendIds.isEmpty()) {
@@ -264,7 +272,16 @@ public class KafkaUtil {
throw new LoadException("failed to get info: " + errorMsg
+ ",");
}
Collections.shuffle(backendIds);
- Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+ long selectedBeId = backendIds.get(0);
+ Backend be =
Env.getCurrentSystemInfo().getBackend(selectedBeId);
+ if (be == null) {
+ if (errorMsg == null) {
+ errorMsg = "backend " + selectedBeId + " does not
exist";
+ }
+ LOG.warn("skip stale backend {} when getting kafka meta",
selectedBeId);
+ retryTimes++;
+ continue;
+ }
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
long beId = be.getId();
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index 29fc336492b..0e19cbca1e2 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -150,5 +150,61 @@ suite("test_black_list","nonConcurrent,p0") {
GetDebugPoint().disableDebugPointForAllBEs(inject)
sql "stop routine load for ${job}"
}
+
+ def invalidBrokerTableName = "test_black_list_invalid_broker"
+ def invalidBrokerJob = "test_black_list_invalid_broker_job"
+ sql """ DROP TABLE IF EXISTS ${invalidBrokerTableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${invalidBrokerTableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${invalidBrokerJob} ON
${invalidBrokerTableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "127.0.0.1:1",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def state = sql "show routine load for ${invalidBrokerJob}"
+ def stateChangedReason = state[0][17].toString()
+ def otherMsg = state[0][19].toString()
+ def errorMsg = "${stateChangedReason} ${otherMsg}"
+ log.info("invalid broker routine load state:
${state[0][8].toString()}".toString())
+ log.info("invalid broker reason of state changed:
${stateChangedReason}".toString())
+ log.info("invalid broker other msg: ${otherMsg}".toString())
+ if (errorMsg.contains("Failed to get all partitions of kafka
topic")) {
+ assertTrue(errorMsg.contains("failed to get info"))
+ assertTrue(errorMsg.contains("failed to get partition
meta: Local: Broker transport failure"))
+ break
+ }
+ if (count >= 90) {
+ log.error("routine load invalid broker test fail")
+ assertEquals(1, 2)
+ break
+ }
+ count++
+ }
+ } finally {
+ try_sql "stop routine load for ${invalidBrokerJob}"
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]