This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c210d328dc2 branch-4.0: [Improve](job) add history task info in task
tvf show #57361 (#57453)
c210d328dc2 is described below
commit c210d328dc2a1b92414c3f382038ead6b542dbef
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 30 10:49:06 2025 +0800
branch-4.0: [Improve](job) add history task info in task tvf show #57361
(#57453)
Cherry-picked from #57361
Co-authored-by: wudi <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 5 ++
.../doris/job/extensions/insert/InsertJob.java | 2 +
.../doris/job/extensions/insert/InsertTask.java | 4 +-
.../insert/streaming/StreamingInsertJob.java | 52 +++++++++++++++
.../insert/streaming/StreamingInsertTask.java | 73 ++++++++++++++++++---
.../insert/streaming/StreamingJobProperties.java | 1 +
.../streaming/StreamingJobSchedulerTask.java | 75 +---------------------
.../org/apache/doris/job/offset/s3/S3Offset.java | 2 +-
.../job/scheduler/StreamingTaskScheduler.java | 11 +++-
.../doris/tablefunction/MetadataGenerator.java | 23 +++++--
.../streaming_job/test_streaming_insert_job.groovy | 17 +++--
.../test_streaming_insert_job_offset.groovy | 6 +-
12 files changed, 169 insertions(+), 102 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9597815fcc4..1b386f98588 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1980,6 +1980,11 @@ public class Config extends ConfigBase {
+ "the value should be greater than 0, if it is <=0,
default is 1024."})
public static int max_streaming_job_num = 1024;
+ @ConfField(masterOnly = true, description = {"一个 Streaming Job 在内存中最多保留的
task的数量,超过将丢弃旧的记录",
+ "The maximum number of tasks a Streaming Job can keep in memory.
If the number exceeds the limit, "
+ + "old records will be discarded."})
+ public static int max_streaming_task_show_count = 100;
+
/* job test config */
/**
* If set to true, we will allow the interval unit to be set to second,
when creating a recurring job.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 305089378a0..90a5f0dd895 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -104,6 +104,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("EndOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
.add(new Column("ErrorMsg", ScalarType.createStringType()))
+ .add(new Column("JobRuntimeMsg", ScalarType.createStringType()))
.build();
public static final ShowResultSetMetaData TASK_META_DATA =
@@ -567,6 +568,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
trow.addToColumnValue(new TCell().setStringVal(
loadStatistic == null ? FeConstants.null_string :
loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index aa1ecc02a9f..69ce9c309be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
- new Column("FirstErrorMsg", ScalarType.createStringType()));
+ new Column("FirstErrorMsg", ScalarType.createStringType()),
+ new Column("RunningOffset", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -297,6 +298,7 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c1c9a77b1c1..b4accf1b15e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -80,9 +80,11 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -117,6 +119,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private long lastScheduleTaskTimestamp = -1L;
private InsertIntoTableCommand baseCommand;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private ConcurrentLinkedQueue<StreamingInsertTask> streamInsertTaskQueue =
new ConcurrentLinkedQueue<>();
+ @Setter
+ @Getter
+ private String jobRuntimeMsg = "";
public StreamingInsertJob(String jobName,
JobStatus jobStatus,
@@ -256,6 +262,20 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ @Override
+ public void cancelAllTasks(boolean needWaitCancelComplete) throws
JobException {
+ lock.writeLock().lock();
+ try {
+ if (runningStreamTask == null) {
+ return;
+ }
+ runningStreamTask.cancel(needWaitCancelComplete);
+ canceledTaskCount.incrementAndGet();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
@Override
public JobType getJobType() {
return JobType.INSERT;
@@ -298,9 +318,35 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.runningStreamTask.setStatus(TaskStatus.PENDING);
log.info("create new streaming insert task for job {}, task {} ",
getJobId(), runningStreamTask.getTaskId());
+ recordTasks(runningStreamTask);
return runningStreamTask;
}
+ public void recordTasks(StreamingInsertTask task) {
+ if (Config.max_streaming_task_show_count < 1) {
+ return;
+ }
+ streamInsertTaskQueue.add(task);
+
+ while (streamInsertTaskQueue.size() >
Config.max_streaming_task_show_count) {
+ streamInsertTaskQueue.poll();
+ }
+ }
+
+ /**
+ * for show command to display all streaming insert tasks of this job.
+ */
+ public List<StreamingInsertTask> queryAllStreamTasks() {
+ if (CollectionUtils.isEmpty(streamInsertTaskQueue)) {
+ return new ArrayList<>();
+ }
+ List<StreamingInsertTask> tasks = new
ArrayList<>(streamInsertTaskQueue);
+ Comparator<StreamingInsertTask> taskComparator =
+
Comparator.comparingLong(StreamingInsertTask::getCreateTimeMs).reversed();
+ tasks.sort(taskComparator);
+ return tasks;
+ }
+
protected void fetchMeta() {
try {
if (originTvfProps == null) {
@@ -480,6 +526,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
jobStatistic == null ? FeConstants.null_string :
jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
? FeConstants.null_string : failureReason.getMsg()));
+ trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
+ ? FeConstants.null_string : jobRuntimeMsg));
return trow;
}
@@ -703,6 +751,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
setCanceledTaskCount(new AtomicLong(0));
}
+ if (null == streamInsertTaskQueue) {
+ streamInsertTaskQueue = new ConcurrentLinkedQueue<>();
+ }
+
if (null == lock) {
this.lock = new ReentrantReadWriteLock(true);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d9fa4b918bb..68f40be923c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,7 +19,9 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
@@ -27,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.job.offset.s3.S3Offset;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
@@ -35,12 +37,17 @@ import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,8 +63,6 @@ public class StreamingInsertTask {
@Setter
private TaskStatus status;
private String errMsg;
- @Setter
- private String otherMsg;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
@@ -154,17 +159,11 @@ public class StreamingInsertTask {
}
private void run() throws JobException {
- StreamingInsertJob job =
- (StreamingInsertJob)
Env.getCurrentEnv().getJobManager().getJob(getJobId());
- StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
- log.info("current running stream task id is {} for job id {}",
- runningStreamTask == null ? -1 :
runningStreamTask.getTaskId(), getJobId());
if (isCanceled.get()) {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
- log.info("start to run streaming insert task, label {}, offset is {},
filepath {}",
- labelName, runningOffset.toString(), ((S3Offset)
runningOffset).getFileLists());
+ log.info("start to run streaming insert task, label {}, offset is {}",
labelName, runningOffset.toString());
String errMsg = null;
try {
taskCommand.run(ctx, stmtExecutor);
@@ -221,10 +220,12 @@ public class StreamingInsertTask {
|| TaskStatus.CANCELED.equals(status)) {
return;
}
+ status = TaskStatus.CANCELED;
if (isCanceled.get()) {
return;
}
isCanceled.getAndSet(true);
+ this.errMsg = "task cancelled";
if (null != stmtExecutor) {
log.info("cancelling streaming insert task, job id is {}, task id
is {}",
getJobId(), getTaskId());
@@ -254,4 +255,56 @@ public class StreamingInsertTask {
}
return false;
}
+
+ /**
+ * show streaming insert task info detail
+ */
+ public TRow getTvfInfo(String jobName) {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(this.getTaskId())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(this.getJobId())));
+ trow.addToColumnValue(new TCell().setStringVal(jobName));
+ trow.addToColumnValue(new TCell().setStringVal(this.getLabelName()));
+ trow.addToColumnValue(new
TCell().setStringVal(this.getStatus().name()));
+ // err msg
+ trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+ ? errMsg : FeConstants.null_string));
+
+ // create time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(this.getCreateTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
+ : TimeUtils.longToTimeString(this.getStartTimeMs())));
+ // load end time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(this.getFinishTimeMs())));
+
+ List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+ .queryLoadJobsByJobIds(Arrays.asList(this.getTaskId()));
+ if (!loadJobs.isEmpty()) {
+ LoadJob loadJob = loadJobs.get(0);
+ if (loadJob.getLoadingStatus() != null &&
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ if (loadJob.getLoadStatistic() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ if (this.getUserIdentity() == null) {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
+ }
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
+ ? FeConstants.null_string : runningOffset.toString()));
+ return trow;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 207509d57fe..0f20dbd4c1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -123,6 +123,7 @@ public class StreamingJobProperties implements
JobProperties {
if (!sessionVarMap.isEmpty()) {
try {
sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+ sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
sessionVariable.readFromMap(sessionVarMap);
} catch (Exception e) {
throw new JobException("Invalid session variable, " +
e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a7a26596e62..7f483a8f587 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,24 +17,15 @@
package org.apache.doris.job.extensions.insert.streaming;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
-import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Arrays;
-import java.util.List;
@Log4j2
public class StreamingJobSchedulerTask extends AbstractTask {
@@ -110,72 +101,12 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
@Override
protected void executeCancelLogic(boolean needWaitCancelComplete) throws
Exception {
- log.info("cancelling streaming insert job scheduler task for job id
{}", streamingInsertJob.getJobId());
- if (streamingInsertJob.getRunningStreamTask() != null) {
-
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
- }
+ // cancel logic in streaming insert task
}
@Override
public TRow getTvfInfo(String jobName) {
- StreamingInsertTask runningTask =
streamingInsertJob.getRunningStreamTask();
- if (runningTask == null) {
- return null;
- }
- if (!streamingInsertJob.needScheduleTask()) {
- //todo: should list history task
- return null;
- }
- TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
- trow.addToColumnValue(new TCell().setStringVal(jobName));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getLabelName()));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getStatus().name()));
- // err msg
- String errMsg = "";
- if (StringUtils.isNotBlank(runningTask.getErrMsg())
- && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
- errMsg = runningTask.getErrMsg();
- } else {
- errMsg = runningTask.getOtherMsg();
- }
- trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
- ? errMsg : FeConstants.null_string));
-
- // create time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
- : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
- // load end time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
-
- List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
- .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
- if (!loadJobs.isEmpty()) {
- LoadJob loadJob = loadJobs.get(0);
- if (loadJob.getLoadingStatus() != null &&
loadJob.getLoadingStatus().getTrackingUrl() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
-
- if (loadJob.getLoadStatistic() != null) {
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- }
-
- if (runningTask.getUserIdentity() == null) {
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- } else {
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
- }
- trow.addToColumnValue(new TCell().setStringVal(""));
- return trow;
+ // only show streaming insert task info in job tvf
+ return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 0e5ae417e73..1605ada23d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -50,6 +50,6 @@ public class S3Offset implements Offset {
@Override
public String toString() {
- return "{\"endFile\": \"" + endFile + "\" }";
+ return "{\"endFile\": \"" + endFile + "\"}";
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 7e99ca3ada9..27a15fb959e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -52,6 +52,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
private final ScheduledThreadPoolExecutor delayScheduler
= new ScheduledThreadPoolExecutor(1, new
CustomThreadFactory("streaming-task-delay-scheduler"));
+ private static long DELAY_SCHEDULER_MS = 500;
+
public StreamingTaskScheduler() {
super("Streaming-task-scheduler", 1);
}
@@ -114,12 +116,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
// reject task if no more data to consume
if (!job.hasMoreDataToConsume()) {
- scheduleTaskWithDelay(task, 500);
+ String delayMsg = "No data available for consumption at the
moment, will retry after "
+ + (System.currentTimeMillis() + DELAY_SCHEDULER_MS);
+ job.setJobRuntimeMsg(delayMsg);
+ scheduleTaskWithDelay(task, DELAY_SCHEDULER_MS);
return;
}
log.info("prepare to schedule task, task id: {}, job id: {}",
task.getTaskId(), task.getJobId());
job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+ // clear delay msg
+ job.setJobRuntimeMsg("");
long start = System.currentTimeMillis();
try {
task.execute();
@@ -131,8 +138,6 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs)
{
- task.setOtherMsg("No data available for consumption at the moment,
will retry after "
- + (System.currentTimeMillis() + delayMs));
delayScheduler.schedule(() -> {
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
}, delayMs, TimeUnit.MILLISECONDS);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 2ce92cb1b1e..c40dd0a03f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -68,6 +68,7 @@ import
org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.BaseTableInfo;
@@ -1228,11 +1229,23 @@ public class MetadataGenerator {
continue;
}
}
- List<AbstractTask> tasks = job.queryAllTasks();
- for (AbstractTask task : tasks) {
- TRow tvfInfo = task.getTvfInfo(job.getJobName());
- if (tvfInfo != null) {
- dataBatch.add(tvfInfo);
+
+ if (job instanceof StreamingInsertJob) {
+ StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+ List<StreamingInsertTask> streamingInsertTasks =
streamingJob.queryAllStreamTasks();
+ for (StreamingInsertTask task : streamingInsertTasks) {
+ TRow tvfInfo = task.getTvfInfo(job.getJobName());
+ if (tvfInfo != null) {
+ dataBatch.add(tvfInfo);
+ }
+ }
+ } else {
+ List<AbstractTask> tasks = job.queryAllTasks();
+ for (AbstractTask task : tasks) {
+ TRow tvfInfo = task.getTvfInfo(job.getJobName());
+ if (tvfInfo != null) {
+ dataBatch.add(tvfInfo);
+ }
}
}
}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 93dc64dfd4e..1e65397742a 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -88,12 +88,12 @@ suite("test_streaming_insert_job") {
PAUSE JOB where jobname = '${jobName}'
"""
def pausedJobStatus = sql """
- select status from jobs("type"="insert") where Name='${jobName}'
+ select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount
from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
- def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- assert pauseShowTask.size() == 0
+ def pauseShowTask = sql """select count(1) from tasks("type"="insert")
where JobName='${jobName}'"""
+ assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
// check encrypt sk
def jobExecuteSQL = sql """
@@ -141,19 +141,22 @@ suite("test_streaming_insert_job") {
RESUME JOB where jobname = '${jobName}'
"""
def resumeJobStatus = sql """
- select status,properties,currentOffset from jobs("type"="insert")
where Name='${jobName}'
+ select status,properties,currentOffset, SucceedTaskCount +
FailedTaskCount + CanceledTaskCount from jobs("type"="insert") where
Name='${jobName}'
"""
assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
assert resumeJobStatus.get(0).get(1) ==
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
assert resumeJobStatus.get(0).get(2) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
Awaitility.await().atMost(60, SECONDS)
.pollInterval(1, SECONDS).until(
{
print("check create streaming task count")
- def resumeShowTask = sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""
- // check streaming task create success
- resumeShowTask.size() == 1
+ def resumeShowTask = sql """select count(1) from
tasks("type"="insert") where JobName='${jobName}'"""
+ def lastTaskStatus = sql """select status from
tasks("type"="insert") where JobName='${jobName}' limit 1 """
+ // A new task is generated
+ resumeShowTask.get(0).get(0) > resumeJobStatus.get(0).get(3) &&
+ ( lastTaskStatus.get(0).get(0) == "PENDING" ||
lastTaskStatus.get(0).get(0) == "RUNNING" )
}
)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index 31f8421ff42..f1c2f91e00b 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -160,12 +160,12 @@ suite("test_streaming_insert_job_offset") {
PAUSE JOB where jobname = '${jobName}'
"""
def pausedJobStatus = sql """
- select status from jobs("type"="insert") where Name='${jobName}'
+ select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount
from jobs("type"="insert") where Name='${jobName}'
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
- def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
- assert pauseShowTask.size() == 0
+ def pauseShowTask = sql """select count(1) from tasks("type"="insert")
where JobName='${jobName}'"""
+ assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
def jobInfo = sql """
select currentOffset, endoffset, loadStatistic, properties from
jobs("type"="insert") where Name='${jobName}'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]