This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf66fbe77f [Fix](Streaming) fix get remote meta failed to pause
streaming job (#59760)
4bf66fbe77f is described below
commit 4bf66fbe77fc0b1ef89b1a1e34e8480948064312
Author: wudi <[email protected]>
AuthorDate: Tue Jan 13 11:04:30 2026 +0800
[Fix](Streaming) fix get remote meta failed to pause streaming job (#59760)
### What problem does this PR solve?
fix get remote meta failed to pause streaming job
Releate PR: #58898
---
.../insert/streaming/StreamingInsertJob.java | 18 +++++++++++++-----
.../reader/postgres/PostgresSourceReader.java | 5 +++--
.../cdc/test_streaming_mysql_job_priv.groovy | 22 +++++++++++++++++-----
.../cdc/test_streaming_postgres_job_priv.groovy | 4 ++--
4 files changed, 35 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index d75427d7554..30f115b3ba5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -111,7 +111,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Getter
@Setter
@SerializedName("fr")
- protected FailureReason failureReason;
+ protected volatile FailureReason failureReason;
@Getter
@Setter
protected long latestAutoResumeTimestamp;
@@ -505,7 +505,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return tasks;
}
- protected void fetchMeta() {
+ protected void fetchMeta() throws JobException {
try {
if (tvfType != null) {
if (originTvfProps == null) {
@@ -516,10 +516,18 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.fetchRemoteMeta(new HashMap<>());
}
} catch (Exception ex) {
- //todo: The job status = MANUAL_PAUSE_ERR, No need to set
failureReason again
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
- failureReason = new
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
- "Failed to fetch meta, " + ex.getMessage());
+ if (this.getFailureReason() == null
+ ||
!InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
+ // When a job is manually paused, it does not need to be set
again,
+ // otherwise, it may be woken up by auto resume.
+ this.setFailureReason(
+ new
FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
+ "Failed to fetch meta, " + ex.getMessage()));
+ // If fetching meta fails, the job is paused
+ // and auto resume will automatically wake it up.
+ this.updateJobStatus(JobStatus.PAUSED);
+ }
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 3e94700f2b4..52c3674444b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -26,6 +26,7 @@ import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
@@ -116,8 +117,8 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
} catch (Throwable t) {
throw new CdcClientException(
String.format(
- "Fail to get or create slot for global stream
split, the slot name is %s. Due to: ",
- postgresDialect.getSlotName()),
+ "Fail to get or create slot, the slot name is %s.
Due to: %s ",
+ postgresDialect.getSlotName(),
ExceptionUtils.getRootCauseMessage(t)),
t);
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
index b6a0926265a..23f3ce7f679 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy
@@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv",
"p0,external,mysql,external_docker,extern
sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES
('DorisTestPriv',28);"""
}
- sleep(20000)
-
- def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert")
where Name='${jobName}'"""
- log.info("jobErrorMsg: " + jobErrorMsg)
- assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta")
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobStatus = sql """ select status, ErrorMsg from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobStatus: " + jobStatus)
+ // check job status
+ jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch
meta")
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
// grant binlog priv to mysqluser
connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
index 7b114b2ca97..682e575596e 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy
@@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv",
"p0,external,pg,external_docker,extern
// check job running
try {
Awaitility.await().atMost(300, SECONDS)
- .pollInterval(3, SECONDS).until(
+ .pollInterval(1, SECONDS).until(
{
def jobStatus = sql """ select status, ErrorMsg from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status
- jobStatus.size() == 1 && 'RUNNING' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch
meta")
+ jobStatus.size() == 1 && 'PAUSED' ==
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch
meta")
}
)
} catch (Exception ex){
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]