This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2ab9cf93cd [Improve](job) add history task info in task tvf show
(#57361)
e2ab9cf93cd is described below
commit e2ab9cf93cdc2e7935b181e98eac09e1ba0d54bb
Author: wudi <[email protected]>
AuthorDate: Wed Oct 29 15:10:05 2025 +0800
[Improve](job) add history task info in task tvf show (#57361)
### What problem does this PR solve?
Currently, a job only retains the most recent task execution
information, and when it pauses or reports an error, the task
information is cleared. However, this lacks observability.
To this end, we've added a history task display.
Note:This information is only stored in memory and is cleared after a
restart.
---
.../main/java/org/apache/doris/common/Config.java | 5 ++
.../doris/job/extensions/insert/InsertJob.java | 2 +
.../doris/job/extensions/insert/InsertTask.java | 4 +-
.../insert/streaming/StreamingInsertJob.java | 52 +++++++++++++++
.../insert/streaming/StreamingInsertTask.java | 73 ++++++++++++++++++---
.../insert/streaming/StreamingJobProperties.java | 1 +
.../streaming/StreamingJobSchedulerTask.java | 75 +---------------------
.../org/apache/doris/job/offset/s3/S3Offset.java | 2 +-
.../job/scheduler/StreamingTaskScheduler.java | 11 +++-
.../doris/tablefunction/MetadataGenerator.java | 23 +++++--
.../streaming_job/test_streaming_insert_job.groovy | 17 +++--
.../test_streaming_insert_job_offset.groovy | 6 +-
12 files changed, 169 insertions(+), 102 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a49d3b941e4..b6dd8b05f36 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1986,6 +1986,11 @@ public class Config extends ConfigBase {
+ "the value should be greater than 0, if it is <=0,
default is 1024."})
public static int max_streaming_job_num = 1024;
+ @ConfField(masterOnly = true, description = {"一个 Streaming Job 在内存中最多保留的
task的数量,超过将丢弃旧的记录",
+ "The maximum number of tasks a Streaming Job can keep in memory.
If the number exceeds the limit, "
+ + "old records will be discarded."})
+ public static int max_streaming_task_show_count = 100;
+
/* job test config */
/**
* If set to true, we will allow the interval unit to be set to second,
when creating a recurring job.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 305089378a0..90a5f0dd895 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -104,6 +104,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("EndOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
.add(new Column("ErrorMsg", ScalarType.createStringType()))
+ .add(new Column("JobRuntimeMsg", ScalarType.createStringType()))
.build();
public static final ShowResultSetMetaData TASK_META_DATA =
@@ -567,6 +568,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
trow.addToColumnValue(new TCell().setStringVal(
loadStatistic == null ? FeConstants.null_string :
loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index aa1ecc02a9f..69ce9c309be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
- new Column("FirstErrorMsg", ScalarType.createStringType()));
+ new Column("FirstErrorMsg", ScalarType.createStringType()),
+ new Column("RunningOffset", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -297,6 +298,7 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c10643867f9..a21a3c854fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -80,9 +80,11 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -117,6 +119,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private long lastScheduleTaskTimestamp = -1L;
private InsertIntoTableCommand baseCommand;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private ConcurrentLinkedQueue<StreamingInsertTask> streamInsertTaskQueue =
new ConcurrentLinkedQueue<>();
+ @Setter
+ @Getter
+ private String jobRuntimeMsg = "";
public StreamingInsertJob(String jobName,
JobStatus jobStatus,
@@ -256,6 +262,20 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ @Override
+ public void cancelAllTasks(boolean needWaitCancelComplete) throws
JobException {
+ lock.writeLock().lock();
+ try {
+ if (runningStreamTask == null) {
+ return;
+ }
+ runningStreamTask.cancel(needWaitCancelComplete);
+ canceledTaskCount.incrementAndGet();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
@Override
public JobType getJobType() {
return JobType.INSERT;
@@ -298,9 +318,35 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.runningStreamTask.setStatus(TaskStatus.PENDING);
log.info("create new streaming insert task for job {}, task {} ",
getJobId(), runningStreamTask.getTaskId());
+ recordTasks(runningStreamTask);
return runningStreamTask;
}
+ public void recordTasks(StreamingInsertTask task) {
+ if (Config.max_streaming_task_show_count < 1) {
+ return;
+ }
+ streamInsertTaskQueue.add(task);
+
+ while (streamInsertTaskQueue.size() >
Config.max_streaming_task_show_count) {
+ streamInsertTaskQueue.poll();
+ }
+ }
+
+ /**
+ * for show command to display all streaming insert tasks of this job.
+ */
+ public List<StreamingInsertTask> queryAllStreamTasks() {
+ if (CollectionUtils.isEmpty(streamInsertTaskQueue)) {
+ return new ArrayList<>();
+ }
+ List<StreamingInsertTask> tasks = new
ArrayList<>(streamInsertTaskQueue);
+ Comparator<StreamingInsertTask> taskComparator =
+
Comparator.comparingLong(StreamingInsertTask::getCreateTimeMs).reversed();
+ tasks.sort(taskComparator);
+ return tasks;
+ }
+
protected void fetchMeta() {
try {
if (originTvfProps == null) {
@@ -511,6 +557,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
jobStatistic == null ? FeConstants.null_string :
jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
? FeConstants.null_string : failureReason.getMsg()));
+ trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
+ ? FeConstants.null_string : jobRuntimeMsg));
return trow;
}
@@ -734,6 +782,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
setCanceledTaskCount(new AtomicLong(0));
}
+ if (null == streamInsertTaskQueue) {
+ streamInsertTaskQueue = new ConcurrentLinkedQueue<>();
+ }
+
if (null == lock) {
this.lock = new ReentrantReadWriteLock(true);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d9fa4b918bb..68f40be923c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,7 +19,9 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
@@ -27,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.job.offset.s3.S3Offset;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
@@ -35,12 +37,17 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,8 +63,6 @@ public class StreamingInsertTask {
@Setter
private TaskStatus status;
private String errMsg;
- @Setter
- private String otherMsg;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
@@ -154,17 +159,11 @@ public class StreamingInsertTask {
}
private void run() throws JobException {
- StreamingInsertJob job =
- (StreamingInsertJob)
Env.getCurrentEnv().getJobManager().getJob(getJobId());
- StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
- log.info("current running stream task id is {} for job id {}",
- runningStreamTask == null ? -1 :
runningStreamTask.getTaskId(), getJobId());
if (isCanceled.get()) {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
- log.info("start to run streaming insert task, label {}, offset is {},
filepath {}",
- labelName, runningOffset.toString(), ((S3Offset)
runningOffset).getFileLists());
+ log.info("start to run streaming insert task, label {}, offset is {}",
labelName, runningOffset.toString());
String errMsg = null;
try {
taskCommand.run(ctx, stmtExecutor);
@@ -221,10 +220,12 @@ public class StreamingInsertTask {
|| TaskStatus.CANCELED.equals(status)) {
return;
}
+ status = TaskStatus.CANCELED;
if (isCanceled.get()) {
return;
}
isCanceled.getAndSet(true);
+ this.errMsg = "task cancelled";
if (null != stmtExecutor) {
log.info("cancelling streaming insert task, job id is {}, task id
is {}",
getJobId(), getTaskId());
@@ -254,4 +255,56 @@ public class StreamingInsertTask {
}
return false;
}
+
+ /**
+ * show streaming insert task info detail
+ */
+ public TRow getTvfInfo(String jobName) {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(this.getTaskId())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(this.getJobId())));
+ trow.addToColumnValue(new TCell().setStringVal(jobName));
+ trow.addToColumnValue(new TCell().setStringVal(this.getLabelName()));
+ trow.addToColumnValue(new
TCell().setStringVal(this.getStatus().name()));
+ // err msg
+ trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+ ? errMsg : FeConstants.null_string));
+
+ // create time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(this.getCreateTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
+ : TimeUtils.longToTimeString(this.getStartTimeMs())));
+ // load end time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(this.getFinishTimeMs())));
+
+ List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+ .queryLoadJobsByJobIds(Arrays.asList(this.getTaskId()));
+ if (!loadJobs.isEmpty()) {
+ LoadJob loadJob = loadJobs.get(0);
+ if (loadJob.getLoadingStatus() != null &&
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ if (loadJob.getLoadStatistic() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ if (this.getUserIdentity() == null) {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
+ }
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
+ ? FeConstants.null_string : runningOffset.toString()));
+ return trow;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 207509d57fe..0f20dbd4c1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -123,6 +123,7 @@ public class StreamingJobProperties implements
JobProperties {
if (!sessionVarMap.isEmpty()) {
try {
sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+ sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
sessionVariable.readFromMap(sessionVarMap);
} catch (Exception e) {
throw new JobException("Invalid session variable, " +
e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a7a26596e62..7f483a8f587 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,24 +17,15 @@
package org.apache.doris.job.extensions.insert.streaming;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
-import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Arrays;
-import java.util.List;
@Log4j2
public class StreamingJobSchedulerTask extends AbstractTask {
@@ -110,72 +101,12 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
@Override
protected void executeCancelLogic(boolean needWaitCancelComplete) throws
Exception {
- log.info("cancelling streaming insert job scheduler task for job id
{}", streamingInsertJob.getJobId());
- if (streamingInsertJob.getRunningStreamTask() != null) {
-
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
- }
+ // cancel logic in streaming insert task
}
@Override
public TRow getTvfInfo(String jobName) {
- StreamingInsertTask runningTask =
streamingInsertJob.getRunningStreamTask();
- if (runningTask == null) {
- return null;
- }
- if (!streamingInsertJob.needScheduleTask()) {
- //todo: should list history task
- return null;
- }
- TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
- trow.addToColumnValue(new TCell().setStringVal(jobName));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getLabelName()));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getStatus().name()));
- // err msg
- String errMsg = "";
- if (StringUtils.isNotBlank(runningTask.getErrMsg())
- && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
- errMsg = runningTask.getErrMsg();
- } else {
- errMsg = runningTask.getOtherMsg();
- }
- trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
- ? errMsg : FeConstants.null_string));
-
- // create time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
- : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
- // load end time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
-
- List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
- .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
- if (!loadJobs.isEmpty()) {
- LoadJob loadJob = loadJobs.get(0);
- if (loadJob.getLoadingStatus() != null &&
loadJob.getLoadingStatus().getTrackingUrl() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
-
- if (loadJob.getLoadStatistic() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
-
- if (runningTask.getUserIdentity() == null) {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
- }
- trow.addToColumnValue(new TCell().setStringVal(""));
- return trow;
+ // only show streaming insert task info in job tvf
+ return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 0e5ae417e73..1605ada23d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -50,6 +50,6 @@ public class S3Offset implements Offset {
@Override
public String toString() {
- return "{\"endFile\": \"" + endFile + "\" }";
+ return "{\"endFile\": \"" + endFile + "\"}";
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 7e99ca3ada9..27a15fb959e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -52,6 +52,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
private final ScheduledThreadPoolExecutor delayScheduler
= new ScheduledThreadPoolExecutor(1, new
CustomThreadFactory("streaming-task-delay-scheduler"));
+ private static long DELAY_SCHEDULER_MS = 500;
+
public StreamingTaskScheduler() {
super("Streaming-task-scheduler", 1);
}
@@ -114,12 +116,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
// reject task if no more data to consume
if (!job.hasMoreDataToConsume()) {
- scheduleTaskWithDelay(task, 500);
+ String delayMsg = "No data available for consumption at the
moment, will retry after "
+ + (System.currentTimeMillis() + DELAY_SCHEDULER_MS);
+ job.setJobRuntimeMsg(delayMsg);
+ scheduleTaskWithDelay(task, DELAY_SCHEDULER_MS);
return;
}
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+ // clear delay msg
+ job.setJobRuntimeMsg("");
long start = System.currentTimeMillis();
try {
task.execute();
@@ -131,8 +138,6 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs)
{
- task.setOtherMsg("No data available for consumption at the moment,
will retry after "
- + (System.currentTimeMillis() + delayMs));
delayScheduler.schedule(() -> {
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
}, delayMs, TimeUnit.MILLISECONDS);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 3fe659b9bd2..c22147ffdd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -68,6 +68,7 @@ import
org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVPartitionUtil;
@@ -1227,11 +1228,23 @@ public class MetadataGenerator {
continue;
}
}
- List<AbstractTask> tasks = job.queryAllTasks();
- for (AbstractTask task : tasks) {
- TRow tvfInfo = task.getTvfInfo(job.getJobName());
- if (tvfInfo != null) {
- dataBatch.add(tvfInfo);
+
+ if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ List<StreamingInsertTask> streamingInsertTasks =
streamingJob.queryAllStreamTasks();
+ for (StreamingInsertTask task : streamingInsertTasks) {
+ TRow tvfInfo = task.getTvfInfo(job.getJobName());
+ if (tvfInfo != null) {
+ dataBatch.add(tvfInfo);
+ }
+ }
+ } else {
+ List<AbstractTask> tasks = job.queryAllTasks();
+ for (AbstractTask task : tasks) {
+ TRow tvfInfo = task.getTvfInfo(job.getJobName());
+ if (tvfInfo != null) {
+ dataBatch.add(tvfInfo);
+ }
}
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index d87b5aa17b8..18632929329 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -88,12 +88,12 @@ suite("test_streaming_insert_job") {
PAUSE JOB where jobname = '${jobName}'
"""
def pausedJobStatus = sql """
- select status from jobs("type"="insert") where Name='${jobName}'
+ select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount
from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
- def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- assert pauseShowTask.size() == 0
+ def pauseShowTask = sql """select count(1) from tasks("type"="insert")
where JobName='${jobName}'"""
+ assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
// check encrypt sk
def jobExecuteSQL = sql """
@@ -141,19 +141,22 @@ suite("test_streaming_insert_job") {
RESUME JOB where jobname = '${jobName}'
"""
def resumeJobStatus = sql """
- select status,properties,currentOffset from jobs("type"="insert")
where Name='${jobName}'
+ select status,properties,currentOffset, SucceedTaskCount +
FailedTaskCount + CanceledTaskCount from jobs("type"="insert") where
Name='${jobName}'
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
assert resumeJobStatus.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert resumeJobStatus.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
Awaitility.await().atMost(60, SECONDS)
.pollInterval(1, SECONDS).until(
{
print("check create streaming task count")
- def resumeShowTask = sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""
- // check streaming task create success
- resumeShowTask.size() == 1
+ def resumeShowTask = sql """select count(1) from
tasks("type"="insert") where JobName='${jobName}'"""
+ def lastTaskStatus = sql """select status from
tasks("type"="insert") where JobName='${jobName}' limit 1 """
+ // A new task is generated
+ resumeShowTask.get(0).get(0) > resumeJobStatus.get(0).get(3) &&
+ ( lastTaskStatus.get(0).get(0) == "PENDING" ||
lastTaskStatus.get(0).get(0) == "RUNNING" )
}
)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index 2971222e9a0..f0dc425abdd 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -160,12 +160,12 @@ suite("test_streaming_insert_job_offset") {
PAUSE JOB where jobname = '${jobName}'
"""
def pausedJobStatus = sql """
- select status from jobs("type"="insert") where Name='${jobName}'
+ select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount
from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
- def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- assert pauseShowTask.size() == 0
+ def pauseShowTask = sql """select count(1) from tasks("type"="insert")
where JobName='${jobName}'"""
+ assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
def jobInfo = sql """
select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]