This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7dbe24412f5 branch-3.0: [fix](load) print real reason if fetching 
Kafka meta fail #51880 (#52058)
7dbe24412f5 is described below

commit 7dbe24412f5b56ebb0862560bfbb06719c852e1c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 23 22:04:26 2025 +0800

    branch-3.0: [fix](load) print real reason if fetching Kafka meta fail 
#51880 (#52058)
    
    Cherry-picked from #51880
    
    Co-authored-by: hui lai <[email protected]>
---
 .../main/java/org/apache/doris/datasource/kafka/KafkaUtil.java | 10 ++++++++--
 .../suites/load_p0/routine_load/test_routine_load_error.groovy |  1 +
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 3e78ba0d4a5..a097f052aa9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -231,6 +231,7 @@ public class KafkaUtil {
         InternalService.PProxyResult result = null;
         Set<Long> failedBeIds = new HashSet<>();
         TStatusCode code = null;
+        String errorMsg = null;
 
         try {
             while (retryTimes < 3) {
@@ -257,7 +258,10 @@ public class KafkaUtil {
                 }
                 if (backendIds.isEmpty()) {
                     
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
-                    throw new LoadException("Failed to get info. No alive 
backends");
+                    if (failedBeIds.isEmpty()) {
+                        errorMsg = "no alive backends";
+                    }
+                    throw new LoadException("failed to get info: " + errorMsg 
+ ",");
                 }
                 Collections.shuffle(backendIds);
                 Backend be = 
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@@ -268,6 +272,7 @@ public class KafkaUtil {
                     future = 
BackendServiceProxy.getInstance().getInfo(address, request);
                     result = 
future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
                 } catch (Exception e) {
+                    errorMsg = e.getMessage();
                     LOG.warn("failed to get info request to " + address + " 
err " + e.getMessage());
                     failedBeIds.add(beId);
                     retryTimes++;
@@ -275,6 +280,7 @@ public class KafkaUtil {
                 }
                 code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
+                    errorMsg = 
result.getStatus().getErrorMsgsList().toString();
                     LOG.warn("failed to get info request to "
                             + address + " err " + 
result.getStatus().getErrorMsgsList());
                     failedBeIds.add(beId);
@@ -285,7 +291,7 @@ public class KafkaUtil {
             }
 
             MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
-            throw new LoadException("Failed to get info");
+            throw new LoadException("failed to get info: " + errorMsg + ",");
         } finally {
             // Ensure that not all BE added to the blacklist.
             // For single request:
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 825752941d5..844d4e5a183 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -285,6 +285,7 @@ suite("test_routine_load_error","p0") {
                 }
                 log.info("reason of state changed: 
${res[0][17].toString()}".toString())
                 assertTrue(res[0][17].toString().contains("may be Kafka 
properties set in job is error or no partition in this topic that should check 
Kafka"))
+                assertTrue(res[0][17].toString().contains("Unknown topic or 
partition"))
                 break;
             }
         } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to