This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new d471124ebd7 [Fix](job) improve job api (#56352)
d471124ebd7 is described below
commit d471124ebd71ee72c911d7c6a36628d88fb3a934
Author: wudi <[email protected]>
AuthorDate: Tue Sep 23 16:44:41 2025 +0800
[Fix](job) improve job api (#56352)
### What problem does this PR solve?
improve job api
---
.../org/apache/doris/job/base/AbstractJob.java | 9 ++++---
.../main/java/org/apache/doris/job/base/Job.java | 7 +++++
.../java/org/apache/doris/job/common/JobUtils.java | 30 ----------------------
.../doris/job/executor/DispatchTaskHandler.java | 3 +--
.../doris/job/executor/TimerJobSchedulerTask.java | 3 +--
.../doris/job/extensions/insert/InsertJob.java | 7 ++---
.../doris/job/extensions/insert/InsertTask.java | 5 +---
.../insert/streaming/StreamingInsertJob.java | 20 +++++++++++----
.../streaming/StreamingJobSchedulerTask.java | 2 --
.../doris/job/offset/SourceOffsetProvider.java | 8 +++---
.../org/apache/doris/job/offset/s3/S3Offset.java | 4 +--
.../job/offset/s3/S3SourceOffsetProvider.java | 12 +++++----
.../apache/doris/job/scheduler/JobScheduler.java | 7 +++--
.../streaming_job/test_streaming_insert_job.groovy | 14 +++++-----
14 files changed, 54 insertions(+), 77 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 5b03ae6d18b..6a113d1a042 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -178,6 +178,11 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
return System.nanoTime() + RandomUtils.nextInt();
}
+ @Override
+ public boolean isJobRunning() {
+ return JobStatus.RUNNING.equals(getJobStatus());
+ }
+
@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
@@ -484,8 +489,4 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
public boolean needPersist() {
return true;
}
-
- public boolean isFinalStatus() {
- return jobStatus.equals(JobStatus.STOPPED) ||
jobStatus.equals(JobStatus.FINISHED);
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index 69d1e5e55fb..3afcd5eef9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -68,6 +68,13 @@ public interface Job<T extends AbstractTask, C> {
*/
boolean isReadyForScheduling(C taskContext);
+ /**
+ * Checks if the job is running.
+ *
+ * @return True if the job is runnning.
+ */
+ boolean isJobRunning();
+
/**
* Retrieves the metadata for the job, which is used to display job
information.
*
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
deleted file mode 100644
index 93ff5f41480..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.job.common;
-
-import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
-
-public class JobUtils {
- public static boolean checkNeedSchedule(AbstractJob job) {
- if (job.getJobConfig().getExecuteType() == JobExecuteType.STREAMING) {
- return !job.isFinalStatus();
- }
- return job.getJobStatus() == JobStatus.RUNNING;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 35b1f351f72..228b9724863 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -19,7 +19,6 @@ package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
@@ -55,7 +54,7 @@ public class DispatchTaskHandler<T extends AbstractJob>
implements WorkHandler<T
log.info("job is null,may be job is deleted, ignore");
return;
}
- if (event.getJob().isReadyForScheduling(null) &&
JobUtils.checkNeedSchedule(event.getJob())) {
+ if (event.getJob().isReadyForScheduling(null) &&
event.getJob().isJobRunning()) {
List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is
empty, skip scheduler,"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 0f58452ff7b..4abf72023a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -18,7 +18,6 @@
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.disruptor.TaskDisruptor;
import io.netty.util.Timeout;
@@ -40,7 +39,7 @@ public class TimerJobSchedulerTask<T extends AbstractJob>
implements TimerTask {
@Override
public void run(Timeout timeout) {
try {
- if (!JobUtils.checkNeedSchedule(job)) {
+ if (!job.isJobRunning()) {
log.info("job status is not running, job id is {}, skip
dispatch", this.job.getJobId());
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index cc16cbb603d..305089378a0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("Comment", ScalarType.createStringType()))
// only execute type = streaming need record
.add(new Column("Properties", ScalarType.createStringType()))
- .add(new Column("ConsumedOffset", ScalarType.createStringType()))
- .add(new Column("MaxOffset", ScalarType.createStringType()))
+ .add(new Column("CurrentOffset", ScalarType.createStringType()))
+ .add(new Column("EndOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
.add(new Column("ErrorMsg", ScalarType.createStringType()))
.build();
@@ -114,15 +114,12 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.addColumn(new Column("EtlInfo",
ScalarType.createVarchar(100)))
.addColumn(new Column("TaskInfo",
ScalarType.createVarchar(100)))
.addColumn(new Column("ErrorMsg",
ScalarType.createVarchar(100)))
-
.addColumn(new Column("CreateTimeMs",
ScalarType.createVarchar(20)))
.addColumn(new Column("FinishTimeMs",
ScalarType.createVarchar(20)))
.addColumn(new Column("TrackingUrl",
ScalarType.createVarchar(200)))
.addColumn(new Column("LoadStatistic",
ScalarType.createVarchar(200)))
.addColumn(new Column("User",
ScalarType.createVarchar(50)))
.addColumn(new Column("FirstErrorMsg",
ScalarType.createVarchar(200)))
- // only execute type = streaming need record
- .addColumn(new Column("Offset",
ScalarType.createStringType()))
.build();
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 5875699fb0f..aa1ecc02a9f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,8 +68,7 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
- new Column("FirstErrorMsg", ScalarType.createStringType()),
- new Column("Offset", ScalarType.createStringType()));
+ new Column("FirstErrorMsg", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -277,7 +276,6 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ?
"" : firstErrorMsg));
- trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
@@ -299,7 +297,6 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
- trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b32ecdcb94e..f70cbc37fca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -63,6 +63,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import java.io.DataOutput;
import java.io.IOException;
@@ -203,7 +204,16 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
- return CollectionUtils.isEmpty(getRunningTasks());
+ return CollectionUtils.isEmpty(getRunningTasks()) && !isFinalStatus();
+ }
+
+ @Override
+ public boolean isJobRunning() {
+ return !isFinalStatus();
+ }
+
+ private boolean isFinalStatus() {
+ return getJobStatus().equals(JobStatus.STOPPED) ||
getJobStatus().equals(JobStatus.FINISHED);
}
@Override
@@ -354,14 +364,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new TCell().setStringVal(properties != null
? GsonUtils.GSON.toJson(properties) :
FeConstants.null_string));
- if (offsetProvider != null && offsetProvider.getConsumedOffset() !=
null) {
- trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getConsumedOffset()));
+ if (offsetProvider != null &&
StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getShowCurrentOffset()));
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
- if (offsetProvider != null && offsetProvider.getMaxOffset() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getMaxOffset()));
+ if (offsetProvider != null &&
StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) {
+ trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getShowMaxOffset()));
} else {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a1c8bce649d..0cd023b7d1d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -154,8 +154,6 @@ public class StreamingJobSchedulerTask extends AbstractTask
{
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(""));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getRunningOffset() == null ?
FeConstants.null_string
- : runningTask.getRunningOffset().toString()));
return trow;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index a27b66812c2..d2cf4e62053 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -40,16 +40,16 @@ public interface SourceOffsetProvider {
Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String>
properties);
/**
- * Get consumered offset to show
+ * Get current offset to show
* @return
*/
- String getConsumedOffset();
+ String getShowCurrentOffset();
/**
- * Get remote datasource max offset
+ * Get remote datasource max offset to show
* @return
*/
- String getMaxOffset();
+ String getShowMaxOffset();
/**
* Rewrite the TVF parameters in the SQL based on the current offset.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index b152660873a..6257eed8c70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -28,8 +28,6 @@ import org.apache.commons.lang3.StringUtils;
@Getter
@Setter
public class S3Offset implements Offset {
- // path/1.csv
- String startFile;
@SerializedName("endFile")
String endFile;
// s3://bucket/path/{1.csv,2.csv}
@@ -47,6 +45,6 @@ public class S3Offset implements Offset {
@Override
public String toString() {
- return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" +
endFile + "\" }";
+ return "{\"endFile\": \"" + endFile + "\" }";
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index e6a5cf809a5..74743d09399 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -33,6 +33,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
+import com.google.gson.Gson;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
@@ -72,7 +73,6 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
String bucket = globListResult.getBucket();
String prefix = globListResult.getPrefix();
- offset.setStartFile(startFile);
String bucketBase = "s3://" + bucket + "/";
// Get the path of the last directory
int lastSlash = prefix.lastIndexOf('/');
@@ -109,16 +109,18 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
@Override
- public String getConsumedOffset() {
+ public String getShowCurrentOffset() {
if (currentOffset != null) {
- return currentOffset.getEndFile();
+ return currentOffset.toSerializedJson();
}
return null;
}
@Override
- public String getMaxOffset() {
- return maxEndFile;
+ public String getShowMaxOffset() {
+ Map<String, String> res = new HashMap<>();
+ res.put("endFile", maxEndFile);
+ return new Gson().toJson(res);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index ab03cce7c91..dde5f891efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.exception.JobException;
@@ -110,7 +109,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
}
public void scheduleOneJob(T job) throws JobException {
- if (!JobUtils.checkNeedSchedule(job)) {
+ if (!job.isJobRunning()) {
return;
}
// not-schedule task
@@ -145,7 +144,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
}
public void cycleTimerJobScheduler(T job) {
- if (!JobUtils.checkNeedSchedule(job)) {
+ if (!job.isJobRunning()) {
return;
}
if
(!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) {
@@ -226,7 +225,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
clearEndJob(job);
continue;
}
- if (JobUtils.checkNeedSchedule(job) &&
job.getJobConfig().checkIsTimerJob()) {
+ if (job.isJobRunning() && job.getJobConfig().checkIsTimerJob()) {
cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 54fc5faa2e8..6865cedd842 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -97,10 +97,10 @@ suite("test_streaming_insert_job") {
def jobOffset = sql """
- select ConsumedOffset, MaxOffset from jobs("type"="insert") where
Name='${jobName}'
+ select currentOffset, endoffset from jobs("type"="insert") where
Name='${jobName}'
"""
- assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
- assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
+ assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobOffset.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
// alter streaming job
sql """
@@ -123,21 +123,21 @@ suite("test_streaming_insert_job") {
"""
def alterJobProperties = sql """
- select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
+ select status,properties,currentOffset from jobs("type"="insert")
where Name='${jobName}'
"""
assert alterJobProperties.get(0).get(0) == "PAUSED"
assert alterJobProperties.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
- assert alterJobProperties.get(0).get(2) ==
"regression/load/data/example_1.csv"
+ assert alterJobProperties.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
sql """
RESUME JOB where jobname = '${jobName}'
"""
def resumeJobStatus = sql """
- select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
+ select status,properties,currentOffset from jobs("type"="insert")
where Name='${jobName}'
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
assert resumeJobStatus.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
- assert resumeJobStatus.get(0).get(2) ==
"regression/load/data/example_1.csv"
+ assert resumeJobStatus.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
Awaitility.await().atMost(60, SECONDS)
.pollInterval(1, SECONDS).until(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]