This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d0dd0904586bc72213955f1b6a599c6514a169f4
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 13:15:46 2024 +0800

    [fix](routine-load) optimize error msg when meet out of range (#30118)
---
 .../org/apache/doris/load/routineload/RoutineLoadJob.java | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 0a947701ef5..889d240ce29 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1142,11 +1142,24 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 if (txnStatusChangeReasonString != null) {
                     txnStatusChangeReason =
                             
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
+                    String msg;
                     if (txnStatusChangeReason != null) {
                         switch (txnStatusChangeReason) {
                             case OFFSET_OUT_OF_RANGE:
+                                msg = "be " + taskBeId + " abort task,"
+                                        + " task id: " + 
routineLoadTaskInfo.getId()
+                                        + " job id: " + 
routineLoadTaskInfo.getJobId()
+                                        + " with reason: " + 
txnStatusChangeReasonString
+                                        + " the offset used by job does not 
exist in kafka,"
+                                        + " please check the offset,"
+                                        + " using the Alter ROUTINE LOAD 
command to modify it,"
+                                        + " and resume the job";
+                                updateState(JobState.PAUSED,
+                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        false /* not replay */);
+                                return;
                             case PAUSE:
-                                String msg = "be " + taskBeId + " abort task "
+                                msg = "be " + taskBeId + " abort task "
                                         + "with reason: " + 
txnStatusChangeReasonString;
                                 updateState(JobState.PAUSED,
                                         new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to