This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d4434cb4ea2 [improve](routine-load) timely pause job if Kafka cluster 
exception when consume (#33372) (#37446)
d4434cb4ea2 is described below

commit d4434cb4ea2b1982c5527d14bd9c1f2dadaab983
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:24:47 2024 +0800

    [improve](routine-load) timely pause job if Kafka cluster exception when 
consume (#33372) (#37446)
    
    pick (#33372)
    
    Co-authored-by: HHoflittlefish777 
<[email protected]>
---
 .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index fad213bc6af..5df7714ffd3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -746,7 +746,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
             }
         } catch (Exception e) {
-            LOG.warn("failed to get latest partition offset. {}", 
e.getMessage(), e);
+            // It needs to pause job when can not get partition meta.
+            // To ensure the stability of the routine load,
+            // the scheduler will automatically pull up routine load job in 
this scenario,
+            // to avoid some network and Kafka exceptions causing the routine 
load job to stop
+            updateState(JobState.PAUSED, new 
ErrorReason(InternalErrorCode.PARTITIONS_ERR,
+                        "failed to get latest partition offset. {}" + 
e.getMessage()),
+                        false /* not replay */);
             return false;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to