This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ab9cf93cd [Improve](job) add history task info in task tvf show 
(#57361)
e2ab9cf93cd is described below

commit e2ab9cf93cdc2e7935b181e98eac09e1ba0d54bb
Author: wudi <[email protected]>
AuthorDate: Wed Oct 29 15:10:05 2025 +0800

    [Improve](job) add history task info in task tvf show (#57361)
    
    ### What problem does this PR solve?
    
    Currently, a job only retains the most recent task execution
    information, and when it pauses or reports an error, the task
    information is cleared. However, this lacks observability.
    To this end, we've added a history task display.
    Note:This information is only stored in memory and is cleared after a
    restart.
---
 .../main/java/org/apache/doris/common/Config.java  |  5 ++
 .../doris/job/extensions/insert/InsertJob.java     |  2 +
 .../doris/job/extensions/insert/InsertTask.java    |  4 +-
 .../insert/streaming/StreamingInsertJob.java       | 52 +++++++++++++++
 .../insert/streaming/StreamingInsertTask.java      | 73 ++++++++++++++++++---
 .../insert/streaming/StreamingJobProperties.java   |  1 +
 .../streaming/StreamingJobSchedulerTask.java       | 75 +---------------------
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  2 +-
 .../job/scheduler/StreamingTaskScheduler.java      | 11 +++-
 .../doris/tablefunction/MetadataGenerator.java     | 23 +++++--
 .../streaming_job/test_streaming_insert_job.groovy | 17 +++--
 .../test_streaming_insert_job_offset.groovy        |  6 +-
 12 files changed, 169 insertions(+), 102 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a49d3b941e4..b6dd8b05f36 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1986,6 +1986,11 @@ public class Config extends ConfigBase {
                     + "the value should be greater than 0, if it is <=0, 
default is 1024."})
     public static int max_streaming_job_num = 1024;
 
+    @ConfField(masterOnly = true, description = {"一个 Streaming Job 在内存中最多保留的 
task的数量,超过将丢弃旧的记录",
+            "The maximum number of tasks a Streaming Job can keep in memory. 
If the number exceeds the limit, "
+                    + "old records will be discarded."})
+    public static int max_streaming_task_show_count = 100;
+
     /* job test config */
     /**
      * If set to true, we will allow the interval unit to be set to second, 
when creating a recurring job.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 305089378a0..90a5f0dd895 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -104,6 +104,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("EndOffset", ScalarType.createStringType()))
             .add(new Column("LoadStatistic", ScalarType.createStringType()))
             .add(new Column("ErrorMsg", ScalarType.createStringType()))
+            .add(new Column("JobRuntimeMsg", ScalarType.createStringType()))
             .build();
 
     public static final ShowResultSetMetaData TASK_META_DATA =
@@ -567,6 +568,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         trow.addToColumnValue(new TCell().setStringVal(
                 loadStatistic == null ? FeConstants.null_string : 
loadStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index aa1ecc02a9f..69ce9c309be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()),
-            new Column("FirstErrorMsg", ScalarType.createStringType()));
+            new Column("FirstErrorMsg", ScalarType.createStringType()),
+            new Column("RunningOffset", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -297,6 +298,7 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index c10643867f9..a21a3c854fa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -80,9 +80,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -117,6 +119,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     private long lastScheduleTaskTimestamp = -1L;
     private InsertIntoTableCommand baseCommand;
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+    private ConcurrentLinkedQueue<StreamingInsertTask> streamInsertTaskQueue = 
new ConcurrentLinkedQueue<>();
+    @Setter
+    @Getter
+    private String jobRuntimeMsg = "";
 
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
@@ -256,6 +262,20 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    @Override
+    public void cancelAllTasks(boolean needWaitCancelComplete) throws 
JobException {
+        lock.writeLock().lock();
+        try {
+            if (runningStreamTask == null) {
+                return;
+            }
+            runningStreamTask.cancel(needWaitCancelComplete);
+            canceledTaskCount.incrementAndGet();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     @Override
     public JobType getJobType() {
         return JobType.INSERT;
@@ -298,9 +318,35 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
         log.info("create new streaming insert task for job {}, task {} ",
                 getJobId(), runningStreamTask.getTaskId());
+        recordTasks(runningStreamTask);
         return runningStreamTask;
     }
 
+    public void recordTasks(StreamingInsertTask task) {
+        if (Config.max_streaming_task_show_count < 1) {
+            return;
+        }
+        streamInsertTaskQueue.add(task);
+
+        while (streamInsertTaskQueue.size() > 
Config.max_streaming_task_show_count) {
+            streamInsertTaskQueue.poll();
+        }
+    }
+
+    /**
+     * for show command to display all streaming insert tasks of this job.
+     */
+    public List<StreamingInsertTask> queryAllStreamTasks() {
+        if (CollectionUtils.isEmpty(streamInsertTaskQueue)) {
+            return new ArrayList<>();
+        }
+        List<StreamingInsertTask> tasks = new 
ArrayList<>(streamInsertTaskQueue);
+        Comparator<StreamingInsertTask> taskComparator =
+                
Comparator.comparingLong(StreamingInsertTask::getCreateTimeMs).reversed();
+        tasks.sort(taskComparator);
+        return tasks;
+    }
+
     protected void fetchMeta() {
         try {
             if (originTvfProps == null) {
@@ -511,6 +557,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failureReason == null
                 ? FeConstants.null_string : failureReason.getMsg()));
+        trow.addToColumnValue(new TCell().setStringVal(jobRuntimeMsg == null
+                ? FeConstants.null_string : jobRuntimeMsg));
         return trow;
     }
 
@@ -734,6 +782,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             setCanceledTaskCount(new AtomicLong(0));
         }
 
+        if (null == streamInsertTaskQueue) {
+            streamInsertTaskQueue = new ConcurrentLinkedQueue<>();
+        }
+
         if (null == lock) {
             this.lock = new ReentrantReadWriteLock(true);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d9fa4b918bb..68f40be923c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -19,7 +19,9 @@ package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.job.base.Job;
 import org.apache.doris.job.common.TaskStatus;
@@ -27,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.job.offset.s3.S3Offset;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.parser.NereidsParser;
@@ -35,12 +37,17 @@ import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TStatusCode;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,8 +63,6 @@ public class StreamingInsertTask {
     @Setter
     private TaskStatus status;
     private String errMsg;
-    @Setter
-    private String otherMsg;
     private Long createTimeMs;
     private Long startTimeMs;
     private Long finishTimeMs;
@@ -154,17 +159,11 @@ public class StreamingInsertTask {
     }
 
     private void run() throws JobException {
-        StreamingInsertJob job =
-                (StreamingInsertJob) 
Env.getCurrentEnv().getJobManager().getJob(getJobId());
-        StreamingInsertTask runningStreamTask = job.getRunningStreamTask();
-        log.info("current running stream task id is {} for job id {}",
-                runningStreamTask == null ? -1 : 
runningStreamTask.getTaskId(), getJobId());
         if (isCanceled.get()) {
             log.info("task has been canceled, task id is {}", getTaskId());
             return;
         }
-        log.info("start to run streaming insert task, label {}, offset is {}, 
filepath {}",
-                labelName, runningOffset.toString(), ((S3Offset) 
runningOffset).getFileLists());
+        log.info("start to run streaming insert task, label {}, offset is {}", 
labelName, runningOffset.toString());
         String errMsg = null;
         try {
             taskCommand.run(ctx, stmtExecutor);
@@ -221,10 +220,12 @@ public class StreamingInsertTask {
                 || TaskStatus.CANCELED.equals(status)) {
             return;
         }
+        status = TaskStatus.CANCELED;
         if (isCanceled.get()) {
             return;
         }
         isCanceled.getAndSet(true);
+        this.errMsg = "task cancelled";
         if (null != stmtExecutor) {
             log.info("cancelling streaming insert task, job id is {}, task id 
is {}",
                     getJobId(), getTaskId());
@@ -254,4 +255,56 @@ public class StreamingInsertTask {
         }
         return false;
     }
+
+    /**
+     * show streaming insert task info detail
+     */
+    public TRow getTvfInfo(String jobName) {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(this.getTaskId())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(this.getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(jobName));
+        trow.addToColumnValue(new TCell().setStringVal(this.getLabelName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(this.getStatus().name()));
+        // err msg
+        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+                ? errMsg : FeConstants.null_string));
+
+        // create time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(this.getCreateTimeMs())));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
+                : TimeUtils.longToTimeString(this.getStartTimeMs())));
+        // load end time
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(this.getFinishTimeMs())));
+
+        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+                .queryLoadJobsByJobIds(Arrays.asList(this.getTaskId()));
+        if (!loadJobs.isEmpty()) {
+            LoadJob loadJob = loadJobs.get(0);
+            if (loadJob.getLoadingStatus() != null && 
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+
+            if (loadJob.getLoadStatistic() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        if (this.getUserIdentity() == null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
+        }
+        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
+                ? FeConstants.null_string : runningOffset.toString()));
+        return trow;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 207509d57fe..0f20dbd4c1e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -123,6 +123,7 @@ public class StreamingJobProperties implements 
JobProperties {
         if (!sessionVarMap.isEmpty()) {
             try {
                 sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+                sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
                 sessionVariable.readFromMap(sessionVarMap);
             } catch (Exception e) {
                 throw new JobException("Invalid session variable, " + 
e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a7a26596e62..7f483a8f587 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,24 +17,15 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
-import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
 import lombok.extern.log4j.Log4j2;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Arrays;
-import java.util.List;
 
 @Log4j2
 public class StreamingJobSchedulerTask extends AbstractTask {
@@ -110,72 +101,12 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
 
     @Override
     protected void executeCancelLogic(boolean needWaitCancelComplete) throws 
Exception {
-        log.info("cancelling streaming insert job scheduler task for job id 
{}", streamingInsertJob.getJobId());
-        if (streamingInsertJob.getRunningStreamTask() != null) {
-            
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
-        }
+        // cancel logic in streaming insert task
     }
 
     @Override
     public TRow getTvfInfo(String jobName) {
-        StreamingInsertTask runningTask = 
streamingInsertJob.getRunningStreamTask();
-        if (runningTask == null) {
-            return null;
-        }
-        if (!streamingInsertJob.needScheduleTask()) {
-            //todo: should list history task
-            return null;
-        }
-        TRow trow = new TRow();
-        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
-        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
-        trow.addToColumnValue(new TCell().setStringVal(jobName));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getLabelName()));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getStatus().name()));
-        // err msg
-        String errMsg = "";
-        if (StringUtils.isNotBlank(runningTask.getErrMsg())
-                && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
-            errMsg = runningTask.getErrMsg();
-        } else {
-            errMsg = runningTask.getOtherMsg();
-        }
-        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
-                ? errMsg : FeConstants.null_string));
-
-        // create time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
-        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
-                : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
-        // load end time
-        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
-
-        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
-                .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
-        if (!loadJobs.isEmpty()) {
-            LoadJob loadJob = loadJobs.get(0);
-            if (loadJob.getLoadingStatus() != null && 
loadJob.getLoadingStatus().getTrackingUrl() != null) {
-                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
-            } else {
-                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            }
-
-            if (loadJob.getLoadStatistic() != null) {
-                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
-            } else {
-                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            }
-        } else {
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        }
-
-        if (runningTask.getUserIdentity() == null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        } else {
-            trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
-        }
-        trow.addToColumnValue(new TCell().setStringVal(""));
-        return trow;
+        // only show streaming insert task info in job tvf
+        return null;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 0e5ae417e73..1605ada23d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -50,6 +50,6 @@ public class S3Offset implements Offset {
 
     @Override
     public String toString() {
-        return "{\"endFile\": \"" + endFile + "\" }";
+        return "{\"endFile\": \"" + endFile + "\"}";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 7e99ca3ada9..27a15fb959e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -52,6 +52,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
     private final ScheduledThreadPoolExecutor delayScheduler
                 = new ScheduledThreadPoolExecutor(1, new 
CustomThreadFactory("streaming-task-delay-scheduler"));
 
+    private static long DELAY_SCHEDULER_MS = 500;
+
     public StreamingTaskScheduler() {
         super("Streaming-task-scheduler", 1);
     }
@@ -114,12 +116,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
         }
         // reject task if no more data to consume
         if (!job.hasMoreDataToConsume()) {
-            scheduleTaskWithDelay(task, 500);
+            String delayMsg = "No data available for consumption at the 
moment, will retry after "
+                    + (System.currentTimeMillis() + DELAY_SCHEDULER_MS);
+            job.setJobRuntimeMsg(delayMsg);
+            scheduleTaskWithDelay(task, DELAY_SCHEDULER_MS);
             return;
         }
         log.info("prepare to schedule task, task id: {}, job id: {}", 
task.getTaskId(), task.getJobId());
         job.setLastScheduleTaskTimestamp(System.currentTimeMillis());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().addRunningTask(task);
+        // clear delay msg
+        job.setJobRuntimeMsg("");
         long start = System.currentTimeMillis();
         try {
             task.execute();
@@ -131,8 +138,6 @@ public class StreamingTaskScheduler extends MasterDaemon {
     }
 
     private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
-        task.setOtherMsg("No data available for consumption at the moment, 
will retry after "
-                + (System.currentTimeMillis() + delayMs));
         delayScheduler.schedule(() -> {
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
         }, delayMs, TimeUnit.MILLISECONDS);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 3fe659b9bd2..c22147ffdd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -68,6 +68,7 @@ import 
org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.task.AbstractTask;
 import org.apache.doris.mtmv.MTMVPartitionUtil;
@@ -1227,11 +1228,23 @@ public class MetadataGenerator {
                     continue;
                 }
             }
-            List<AbstractTask> tasks = job.queryAllTasks();
-            for (AbstractTask task : tasks) {
-                TRow tvfInfo = task.getTvfInfo(job.getJobName());
-                if (tvfInfo != null) {
-                    dataBatch.add(tvfInfo);
+
+            if (job instanceof StreamingInsertJob) {
+                StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+                List<StreamingInsertTask> streamingInsertTasks = 
streamingJob.queryAllStreamTasks();
+                for (StreamingInsertTask task : streamingInsertTasks) {
+                    TRow tvfInfo = task.getTvfInfo(job.getJobName());
+                    if (tvfInfo != null) {
+                        dataBatch.add(tvfInfo);
+                    }
+                }
+            } else {
+                List<AbstractTask> tasks = job.queryAllTasks();
+                for (AbstractTask task : tasks) {
+                    TRow tvfInfo = task.getTvfInfo(job.getJobName());
+                    if (tvfInfo != null) {
+                        dataBatch.add(tvfInfo);
+                    }
                 }
             }
         }
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index d87b5aa17b8..18632929329 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -88,12 +88,12 @@ suite("test_streaming_insert_job") {
         PAUSE JOB where jobname =  '${jobName}'
     """
     def pausedJobStatus = sql """
-        select status from jobs("type"="insert") where Name='${jobName}'
+        select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount 
from jobs("type"="insert") where Name='${jobName}'
     """
     assert pausedJobStatus.get(0).get(0) == "PAUSED"
 
-    def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-    assert pauseShowTask.size() == 0
+    def pauseShowTask = sql """select count(1) from tasks("type"="insert") 
where JobName='${jobName}'"""
+    assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
 
     // check encrypt sk
     def jobExecuteSQL = sql """
@@ -141,19 +141,22 @@ suite("test_streaming_insert_job") {
         RESUME JOB where jobname =  '${jobName}'
     """
     def resumeJobStatus = sql """
-        select status,properties,currentOffset from jobs("type"="insert") 
where Name='${jobName}'
+        select status,properties,currentOffset, SucceedTaskCount + 
FailedTaskCount + CanceledTaskCount from jobs("type"="insert") where 
Name='${jobName}'
     """
     assert resumeJobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
     assert resumeJobStatus.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
     assert resumeJobStatus.get(0).get(2) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
 
+
     Awaitility.await().atMost(60, SECONDS)
             .pollInterval(1, SECONDS).until(
             {
                 print("check create streaming task count")
-                def resumeShowTask = sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""
-                // check streaming task create success
-                resumeShowTask.size() == 1
+                def resumeShowTask = sql """select count(1) from 
tasks("type"="insert") where JobName='${jobName}'"""
+                def lastTaskStatus = sql """select status from 
tasks("type"="insert") where JobName='${jobName}' limit 1 """
+                // A new task is generated
+                resumeShowTask.get(0).get(0) > resumeJobStatus.get(0).get(3) &&
+                        ( lastTaskStatus.get(0).get(0) == "PENDING" || 
lastTaskStatus.get(0).get(0) == "RUNNING" )
             }
     )
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index 2971222e9a0..f0dc425abdd 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -160,12 +160,12 @@ suite("test_streaming_insert_job_offset") {
         PAUSE JOB where jobname =  '${jobName}'
     """
     def pausedJobStatus = sql """
-        select status from jobs("type"="insert") where Name='${jobName}'
+        select status, SucceedTaskCount + FailedTaskCount + CanceledTaskCount  
from jobs("type"="insert") where Name='${jobName}'
     """
     assert pausedJobStatus.get(0).get(0) == "PAUSED"
 
-    def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
-    assert pauseShowTask.size() == 0
+    def pauseShowTask = sql """select count(1) from tasks("type"="insert") 
where JobName='${jobName}'"""
+    assert pauseShowTask.get(0).get(0) == pausedJobStatus.get(0).get(1)
 
     def jobInfo = sql """
         select currentOffset, endoffset, loadStatistic, properties from 
jobs("type"="insert") where Name='${jobName}'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to