This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8616265b399 branch-4.1: [fix](job) fix NPE in routine load Kafka meta 
request #63180 (#63511)
8616265b399 is described below

commit 8616265b399d8565d003ad5888f8a2677a01508a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 22:49:28 2026 +0800

    branch-4.1: [fix](job) fix NPE in routine load Kafka meta request #63180 
(#63511)
    
    Cherry-picked from #63180
    
    Co-authored-by: hui lai <[email protected]>
---
 .../apache/doris/datasource/kafka/KafkaUtil.java   | 23 +++++++--
 .../load_p0/routine_load/test_black_list.groovy    | 56 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index a097f052aa9..00fd28c88da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -252,8 +252,16 @@ public class KafkaUtil {
                 // 2. If that sole backend is decommissioned, the 
aliveBackends list becomes empty.
                 // Hence, in such cases, it's essential to rely on the 
blacklist to obtain meta information.
                 if (backendIds.isEmpty()) {
-                    for (Long beId : 
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
-                        backendIds.add(beId);
+                    Map<Long, Long> blacklist = 
Env.getCurrentEnv().getRoutineLoadManager().getBlacklist();
+                    for (Long beId : blacklist.keySet()) {
+                        Backend backend = 
Env.getCurrentSystemInfo().getBackend(beId);
+                        if (backend != null) {
+                            backendIds.add(beId);
+                        } else {
+                            blacklist.remove(beId);
+                            LOG.warn("remove stale backend {} from routine 
load blacklist when getting kafka meta",
+                                    beId);
+                        }
                     }
                 }
                 if (backendIds.isEmpty()) {
@@ -264,7 +272,16 @@ public class KafkaUtil {
                     throw new LoadException("failed to get info: " + errorMsg 
+ ",");
                 }
                 Collections.shuffle(backendIds);
-                Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
+                long selectedBeId = backendIds.get(0);
+                Backend be = 
Env.getCurrentSystemInfo().getBackend(selectedBeId);
+                if (be == null) {
+                    if (errorMsg == null) {
+                        errorMsg = "backend " + selectedBeId + " does not 
exist";
+                    }
+                    LOG.warn("skip stale backend {} when getting kafka meta", 
selectedBeId);
+                    retryTimes++;
+                    continue;
+                }
                 address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
                 long beId = be.getId();
 
diff --git a/regression-test/suites/load_p0/routine_load/test_black_list.groovy 
b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
index 29fc336492b..0e19cbca1e2 100644
--- a/regression-test/suites/load_p0/routine_load/test_black_list.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_black_list.groovy
@@ -150,5 +150,61 @@ suite("test_black_list","nonConcurrent,p0") {
             GetDebugPoint().disableDebugPointForAllBEs(inject)
             sql "stop routine load for ${job}"
         }
+
+        def invalidBrokerTableName = "test_black_list_invalid_broker"
+        def invalidBrokerJob = "test_black_list_invalid_broker_job"
+        sql """ DROP TABLE IF EXISTS ${invalidBrokerTableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${invalidBrokerTableName} (
+                `k1` int(20) NULL,
+                `k2` string NULL,
+                `v1` date  NULL,
+                `v2` string  NULL,
+                `v3` datetime  NULL,
+                `v4` string  NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${invalidBrokerJob} ON 
${invalidBrokerTableName}
+                COLUMNS TERMINATED BY ","
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "127.0.0.1:1",
+                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def state = sql "show routine load for ${invalidBrokerJob}"
+                def stateChangedReason = state[0][17].toString()
+                def otherMsg = state[0][19].toString()
+                def errorMsg = "${stateChangedReason} ${otherMsg}"
+                log.info("invalid broker routine load state: 
${state[0][8].toString()}".toString())
+                log.info("invalid broker reason of state changed: 
${stateChangedReason}".toString())
+                log.info("invalid broker other msg: ${otherMsg}".toString())
+                if (errorMsg.contains("Failed to get all partitions of kafka 
topic")) {
+                    assertTrue(errorMsg.contains("failed to get info"))
+                    assertTrue(errorMsg.contains("failed to get partition 
meta: Local: Broker transport failure"))
+                    break
+                }
+                if (count >= 90) {
+                    log.error("routine load invalid broker test fail")
+                    assertEquals(1, 2)
+                    break
+                }
+                count++
+            }
+        } finally {
+            try_sql "stop routine load for ${invalidBrokerJob}"
+        }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to