This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a014a377160 [fix](routine-load) optimize error msg when meet out of 
range (#30118) (#30210)
a014a377160 is described below

commit a014a377160618f6df48c4abbf908112f0fa01de
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Mon Jan 22 20:34:36 2024 +0800

    [fix](routine-load) optimize error msg when meet out of range (#30118) 
(#30210)
---
 .../org/apache/doris/load/routineload/RoutineLoadJob.java | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 10b2d0cf734..62e82d95457 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1124,11 +1124,24 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 if (txnStatusChangeReasonString != null) {
                     txnStatusChangeReason =
                             
TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString);
+                    String msg;
                     if (txnStatusChangeReason != null) {
                         switch (txnStatusChangeReason) {
                             case OFFSET_OUT_OF_RANGE:
+                                msg = "be " + taskBeId + " abort task,"
+                                        + " task id: " + 
routineLoadTaskInfo.getId()
+                                        + " job id: " + 
routineLoadTaskInfo.getJobId()
+                                        + " with reason: " + 
txnStatusChangeReasonString
+                                        + " the offset used by job does not 
exist in kafka,"
+                                        + " please check the offset,"
+                                        + " using the Alter ROUTINE LOAD 
command to modify it,"
+                                        + " and resume the job";
+                                updateState(JobState.PAUSED,
+                                        new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
+                                        false /* not replay */);
+                                return;
                             case PAUSE:
-                                String msg = "be " + taskBeId + " abort task "
+                                msg = "be " + taskBeId + " abort task "
                                         + "with reason: " + 
txnStatusChangeReasonString;
                                 updateState(JobState.PAUSED,
                                         new 
ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to