This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf66fbe77f [Fix](Streaming) fix get remote meta failed to pause 
streaming job (#59760)
4bf66fbe77f is described below

commit 4bf66fbe77fc0b1ef89b1a1e34e8480948064312
Author: wudi <[email protected]>
AuthorDate: Tue Jan 13 11:04:30 2026 +0800

    [Fix](Streaming) fix get remote meta failed to pause streaming job (#59760)
    
    ### What problem does this PR solve?
    
    fix get remote meta failed to pause streaming job
    
    Releate PR: #58898
---
 .../insert/streaming/StreamingInsertJob.java       | 18 +++++++++++++-----
 .../reader/postgres/PostgresSourceReader.java      |  5 +++--
 .../cdc/test_streaming_mysql_job_priv.groovy       | 22 +++++++++++++++++-----
 .../cdc/test_streaming_postgres_job_priv.groovy    |  4 ++--
 4 files changed, 35 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index d75427d7554..30f115b3ba5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -111,7 +111,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Getter
     @Setter
     @SerializedName("fr")
-    protected FailureReason failureReason;
+    protected volatile FailureReason failureReason;
     @Getter
     @Setter
     protected long latestAutoResumeTimestamp;
@@ -505,7 +505,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return tasks;
     }
 
-    protected void fetchMeta() {
+    protected void fetchMeta() throws JobException {
         try {
             if (tvfType != null) {
                 if (originTvfProps == null) {
@@ -516,10 +516,18 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 offsetProvider.fetchRemoteMeta(new HashMap<>());
             }
         } catch (Exception ex) {
-            //todo: The job status = MANUAL_PAUSE_ERR, No need to set 
failureReason again
             log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
-            failureReason = new 
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
-                    "Failed to fetch meta, " + ex.getMessage());
+            if (this.getFailureReason() == null
+                    || 
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
+                // When a job is manually paused, it does not need to be set 
again,
+                // otherwise, it may be woken up by auto resume.
+                this.setFailureReason(
+                        new 
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+                                "Failed to fetch meta, " + ex.getMessage()));
+                // If fetching meta fails, the job is paused
+                // and auto resume will automatically wake it up.
+                this.updateJobStatus(JobStatus.PAUSED);
+            }
         }
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 3e94700f2b4..52c3674444b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -26,6 +26,7 @@ import org.apache.doris.job.cdc.request.CompareOffsetRequest;
 import org.apache.doris.job.cdc.request.JobBaseConfig;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
@@ -116,8 +117,8 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         } catch (Throwable t) {
             throw new CdcClientException(
                     String.format(
-                            "Fail to get or create slot for global stream 
split, the slot name is %s. Due to: ",
-                            postgresDialect.getSlotName()),
+                            "Fail to get or create slot, the slot name is %s. 
Due to: %s ",
+                            postgresDialect.getSlotName(), 
ExceptionUtils.getRootCauseMessage(t)),
                     t);
         }
     }
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index b6a0926265a..23f3ce7f679 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
             sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES 
('DorisTestPriv',28);"""
         }
 
-       sleep(20000)
-
-       def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") 
where Name='${jobName}'"""
-       log.info("jobErrorMsg: " + jobErrorMsg)
-       assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta")
+       try {
+           Awaitility.await().atMost(300, SECONDS)
+                   .pollInterval(1, SECONDS).until(
+                   {
+                       def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                       log.info("jobStatus: " + jobStatus)
+                       // check job status
+                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
+                   }
+           )
+       } catch (Exception ex){
+           def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+           def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+           log.info("show job: " + showjob)
+           log.info("show task: " + showtask)
+           throw ex;
+       }
 
         // grant binlog priv to mysqluser
         connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 7b114b2ca97..682e575596e 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv", 
"p0,external,pg,external_docker,extern
         // check job running
         try {
             Awaitility.await().atMost(300, SECONDS)
-                    .pollInterval(3, SECONDS).until(
+                    .pollInterval(1, SECONDS).until(
                     {
                         def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                         log.info("jobStatus: " + jobStatus)
                         // check job status
-                        jobStatus.size() == 1 && 'RUNNING' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
+                        jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
                     }
             )
         } catch (Exception ex){


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to