This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 168abaeb279 [improve](routine-load) optimize error msg when failed to
fetch Kafka info #30298
168abaeb279 is described below
commit 168abaeb2793f784813bc8d4f2f7c2856fea97f7
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Jan 25 14:08:40 2024 +0800
[improve](routine-load) optimize error msg when failed to fetch Kafka info
#30298
---
.../load/routineload/KafkaRoutineLoadJob.java | 8 ++--
.../routine_load/test_routine_load_error.groovy | 43 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index faad0a0248a..77171b6a4cb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -325,13 +325,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
try {
this.newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
+ String msg = e.getMessage()
+ + " may be Kafka properties set in job is error"
+ + " or no partition in this topic that should check
Kafka";
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
- .add("error_msg", "Job failed to fetch all current
partition with error " + e.getMessage())
+ .add("error_msg", msg)
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
- new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
- "Job failed to fetch all current partition
with error " + e.getMessage()),
+ new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
false /* not replay */);
}
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 191ea4381fd..5b42e84be00 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -248,4 +248,47 @@ suite("test_routine_load_error","p0") {
sql """ DROP TABLE IF EXISTS ${tableName} """
}
}
+
+ // test failed to fetch all current partition
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def jobName = "invalid_topic"
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "invalid_topic",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "PAUSED") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ log.info("reason of state changed:
${res[0][17].toString()}".toString())
+ assertTrue(res[0][17].toString().contains("may be Kafka
properties set in job is error or no partition in this topic that should check
Kafka"))
+ break;
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]