This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7dbe24412f5 branch-3.0: [fix](load) print real reason if fetching
Kafka meta fail #51880 (#52058)
7dbe24412f5 is described below
commit 7dbe24412f5b56ebb0862560bfbb06719c852e1c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 23 22:04:26 2025 +0800
branch-3.0: [fix](load) print real reason if fetching Kafka meta fail
#51880 (#52058)
Cherry-picked from #51880
Co-authored-by: hui lai <[email protected]>
---
.../main/java/org/apache/doris/datasource/kafka/KafkaUtil.java | 10 ++++++++--
.../suites/load_p0/routine_load/test_routine_load_error.groovy | 1 +
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 3e78ba0d4a5..a097f052aa9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -231,6 +231,7 @@ public class KafkaUtil {
InternalService.PProxyResult result = null;
Set<Long> failedBeIds = new HashSet<>();
TStatusCode code = null;
+ String errorMsg = null;
try {
while (retryTimes < 3) {
@@ -257,7 +258,10 @@ public class KafkaUtil {
}
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
- throw new LoadException("Failed to get info. No alive
backends");
+ if (failedBeIds.isEmpty()) {
+ errorMsg = "no alive backends";
+ }
+ throw new LoadException("failed to get info: " + errorMsg
+ ",");
}
Collections.shuffle(backendIds);
Backend be =
Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
@@ -268,6 +272,7 @@ public class KafkaUtil {
future =
BackendServiceProxy.getInstance().getInfo(address, request);
result =
future.get(Config.max_get_kafka_meta_timeout_second, TimeUnit.SECONDS);
} catch (Exception e) {
+ errorMsg = e.getMessage();
LOG.warn("failed to get info request to " + address + "
err " + e.getMessage());
failedBeIds.add(beId);
retryTimes++;
@@ -275,6 +280,7 @@ public class KafkaUtil {
}
code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
+ errorMsg =
result.getStatus().getErrorMsgsList().toString();
LOG.warn("failed to get info request to "
+ address + " err " +
result.getStatus().getErrorMsgsList());
failedBeIds.add(beId);
@@ -285,7 +291,7 @@ public class KafkaUtil {
}
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
- throw new LoadException("Failed to get info");
+ throw new LoadException("failed to get info: " + errorMsg + ",");
} finally {
// Ensure that not all BE added to the blacklist.
// For single request:
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 825752941d5..844d4e5a183 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -285,6 +285,7 @@ suite("test_routine_load_error","p0") {
}
log.info("reason of state changed:
${res[0][17].toString()}".toString())
assertTrue(res[0][17].toString().contains("may be Kafka
properties set in job is error or no partition in this topic that should check
Kafka"))
+ assertTrue(res[0][17].toString().contains("Unknown topic or
partition"))
break;
}
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]