This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 1be7d551b93 branch-4.0: [fix](job) fix show job and task offset info
#57736 (#57845)
1be7d551b93 is described below
commit 1be7d551b933e1767843c0853699dc159c784822
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 11 09:22:40 2025 +0800
branch-4.0: [fix](job) fix show job and task offset info #57736 (#57845)
Cherry-picked from #57736
Co-authored-by: wudi <[email protected]>
---
.../doris/job/extensions/insert/InsertTask.java | 2 ++
.../insert/streaming/StreamingInsertJob.java | 8 +++++++-
.../insert/streaming/StreamingInsertTask.java | 3 ++-
.../java/org/apache/doris/job/offset/Offset.java | 2 ++
.../org/apache/doris/job/offset/s3/S3Offset.java | 10 +++++++++-
.../doris/job/offset/s3/S3SourceOffsetProvider.java | 10 ++++++++--
.../streaming_job/test_streaming_insert_job.groovy | 20 +++++++++++++++-----
.../test_streaming_insert_job_alter.groovy | 2 +-
.../test_streaming_insert_job_crud.groovy | 7 +++++--
.../test_streaming_insert_job_offset.groovy | 8 ++++----
...test_streaming_job_alter_offset_restart_fe.groovy | 12 ++++++------
.../test_streaming_job_restart_fe.groovy | 8 ++++----
12 files changed, 65 insertions(+), 27 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 69ce9c309be..b384c35ebdd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
@@ -277,6 +278,7 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ?
"" : firstErrorMsg));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 57eab5cb816..0940011527d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -368,7 +368,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
public boolean needScheduleTask() {
- return (getJobStatus().equals(JobStatus.RUNNING) ||
getJobStatus().equals(JobStatus.PENDING));
+ readLock();
+ try {
+ return (getJobStatus().equals(JobStatus.RUNNING)
+ || getJobStatus().equals(JobStatus.PENDING));
+ } finally {
+ readUnlock();
+ }
}
public void clearRunningStreamTask(JobStatus newJobStatus) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index ec3c6407be7..f25ebe794a0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -144,6 +144,7 @@ public class StreamingInsertTask {
ctx.setStatementContext(statementContext);
this.runningOffset = offsetProvider.getNextOffset(jobProperties,
originTvfProps);
+ log.info("streaming insert task {} get running offset: {}", taskId,
runningOffset.toString());
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(sql);
baseCommand.setJobId(getTaskId());
StmtExecutor baseStmtExecutor =
@@ -303,7 +304,7 @@ public class StreamingInsertTask {
}
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
- ? FeConstants.null_string : runningOffset.toString()));
+ ? FeConstants.null_string : runningOffset.showRange()));
return trow;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index 601a77a65b8..4f0ff5b2280 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -26,4 +26,6 @@ public interface Offset {
boolean isEmpty();
boolean isValidOffset();
+
+ String showRange();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 1605ada23d0..ebd33803490 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
@Getter
@Setter
public class S3Offset implements Offset {
+ String startFile;
@SerializedName("endFile")
String endFile;
// s3://bucket/path/{1.csv,2.csv}
String fileLists;
+ int fileNum;
@Override
public String toSerializedJson() {
@@ -48,8 +50,14 @@ public class S3Offset implements Offset {
return StringUtils.isNotBlank(endFile);
}
+ @Override
+ public String showRange() {
+ return "{\"startFileName\":\"" + startFile + "\",\"endFileName\":\"" +
endFile + "\"}";
+ }
+
@Override
public String toString() {
- return "{\"endFile\": \"" + endFile + "\"}";
+ return "{\"startFileName\":\"" + startFile + "\","
+ + "\"endFileName\":\"" + endFile + "\",\"fileNum\":" + fileNum
+ "}";
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index ac5ff623494..15a961bb666 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -88,16 +88,20 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
// base is a single file
offset.setFileLists(filePathBase);
String lastFile = rfiles.get(rfiles.size() -
1).getName().replace(bucketBase, "");
+ offset.setStartFile(lastFile);
offset.setEndFile(lastFile);
} else {
// base is dir
String normalizedPrefix = basePrefix.endsWith("/")
? basePrefix.substring(0, basePrefix.length() - 1)
: basePrefix;
String finalFileLists = String.format("s3://%s/%s/{%s}",
bucket, normalizedPrefix, joined);
+ String beginFile =
rfiles.get(0).getName().replace(bucketBase, "");
String lastFile = rfiles.get(rfiles.size() -
1).getName().replace(bucketBase, "");
offset.setFileLists(finalFileLists);
+ offset.setStartFile(beginFile);
offset.setEndFile(lastFile);
}
+ offset.setFileNum(rfiles.size());
maxEndFile = globListResult.getMaxFile();
} else {
throw new RuntimeException("No new files found in path: " +
filePath);
@@ -112,7 +116,9 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
@Override
public String getShowCurrentOffset() {
if (currentOffset != null) {
- return currentOffset.toSerializedJson();
+ Map<String, String> res = new HashMap<>();
+ res.put("fileName", currentOffset.getEndFile());
+ return new Gson().toJson(res);
}
return null;
}
@@ -121,7 +127,7 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
public String getShowMaxOffset() {
if (maxEndFile != null) {
Map<String, String> res = new HashMap<>();
- res.put("endFile", maxEndFile);
+ res.put("fileName", maxEndFile);
return new Gson().toJson(res);
}
return null;
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 18632929329..4d9998d3fa9 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -106,10 +106,20 @@ suite("test_streaming_insert_job") {
select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
-
+
+ def showTask = sql """select * from tasks("type"="insert") where
jobName='${jobName}'"""
+ log.info("showTask is : " + showTask )
+
+ // check task show
+ def taskInfo = sql """select Status,RunningOffset from
tasks("type"="insert") where jobName='${jobName}'"""
+ log.info("taskInfo is : " + taskInfo + ", size: " + taskInfo.size())
+ assert taskInfo.size() > 0
+ assert taskInfo.get(taskInfo.size()-1).get(0) == "SUCCESS"
+ assert taskInfo.get(taskInfo.size()-1).get(1) ==
"{\"startFileName\":\"regression/load/data/example_0.csv\",\"endFileName\":\"regression/load/data/example_0.csv\"}"
+
// alter streaming job
sql """
ALTER JOB ${jobName}
@@ -135,7 +145,7 @@ suite("test_streaming_insert_job") {
"""
assert alterJobProperties.get(0).get(0) == "PAUSED"
assert alterJobProperties.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
- assert alterJobProperties.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert alterJobProperties.get(0).get(2) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
sql """
RESUME JOB where jobname = '${jobName}'
@@ -145,7 +155,7 @@ suite("test_streaming_insert_job") {
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
assert resumeJobStatus.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
- assert resumeJobStatus.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert resumeJobStatus.get(0).get(2) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
Awaitility.await().atMost(60, SECONDS)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
index 097515292c2..aa7fbea124b 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
@@ -141,7 +141,7 @@ suite("test_streaming_insert_job_alter") {
def jobOffset = sql """
select currentOffset from jobs("type"="insert") where Name='${jobName}'
"""
- assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobOffset.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
qt_select """ SELECT * FROM ${tableName} order by c1 """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
index 4d654fcb116..f648767fdaf 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
@@ -28,6 +28,9 @@ suite("test_streaming_insert_job_crud") {
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobNameError}'
+ """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`c1` int NULL,
@@ -331,8 +334,8 @@ suite("test_streaming_insert_job_crud") {
def jobOffset = sql """
select currentOffset, endoffset from jobs("type"="insert") where
Name='${jobName}'
"""
- assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobOffset.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobOffset.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobOffset.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
qt_select """ SELECT * FROM ${tableName} order by c1 """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index f0dc425abdd..a9439095a64 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -171,8 +171,8 @@ suite("test_streaming_insert_job_offset") {
select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"
@@ -209,8 +209,8 @@ suite("test_streaming_insert_job_offset") {
select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
index e6deaeceeb8..66e18248774 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
@@ -91,8 +91,8 @@ suite("test_streaming_job_alter_offset_restart_fe", "docker")
{
select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
sql """
@@ -111,7 +111,7 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
select currentOffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
@@ -130,7 +130,7 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
select currentOffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/anoexist1234.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
@@ -161,8 +161,8 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":0,\"fileSize\":0}"
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index f2e8bef87ab..11d2113ce5d 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -95,8 +95,8 @@ suite("test_streaming_job_restart_fe", "docker") {
select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
// Restart FE
@@ -113,8 +113,8 @@ suite("test_streaming_job_restart_fe", "docker") {
select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]