This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new b2adc933555 [Feature](WIP) Fix Alter Job and schedule bug etc (#56166)
b2adc933555 is described below

commit b2adc933555c4654984cac65c861a0a089b5da4d
Author: wudi <[email protected]>
AuthorDate: Wed Sep 17 22:35:48 2025 +0800

    [Feature](WIP) Fix Alter Job and schedule bug etc (#56166)
    
    ### What problem does this PR solve?
    
    Fix Alter Job and schedule bug etc
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  2 +-
 .../apache/doris/cloud/transaction/TxnUtil.java    |  2 +-
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java |  7 ++-
 .../doris/job/base/JobExecutionConfiguration.java  | 17 +-------
 .../{PauseReason.java => FailureReason.java}       |  8 +++-
 .../doris/job/extensions/insert/InsertJob.java     |  2 +
 .../doris/job/extensions/insert/InsertTask.java    |  4 +-
 .../insert/streaming/StreamingInsertJob.java       | 44 +++++++++++++++----
 .../insert/streaming/StreamingInsertTask.java      |  4 +-
 .../streaming/StreamingJobSchedulerTask.java       | 50 ++++++++++++++++-----
 .../StreamingTaskTxnCommitAttachment.java          |  9 ++--
 .../doris/job/offset/SourceOffsetProvider.java     |  6 +++
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  2 +
 .../job/offset/s3/S3SourceOffsetProvider.java      |  9 +++-
 .../job/scheduler/StreamingTaskScheduler.java      |  2 +
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  3 +-
 .../trees/plans/commands/AlterJobCommand.java      |  4 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   |  5 ++-
 .../org/apache/doris/catalog/DropFunctionTest.java | 11 +----
 .../streaming_job/test_streaming_insert_job.groovy | 51 +++++++++++++++++++---
 20 files changed, 172 insertions(+), 70 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index a0ae7c6916e..4b6dd3668d4 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -111,7 +111,7 @@ supportedJobStatement
        commentSpec?
        DO supportedDmlStatement                                                
                                                             #createScheduledJob
    | PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)  
                                                              #pauseJob
-   | ALTER JOB FOR (jobNameKey=identifier) (propertyClause | 
supportedDmlStatement | propertyClause  supportedDmlStatement)                  
#alterJob
+   | ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause | 
supportedDmlStatement | propertyClause  supportedDmlStatement)                  
#alterJob
    | DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL)                                                   
 #dropJob
    | RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) 
                                                              #resumeJob
    | CANCEL TASK WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ 
(taskIdValue=INTEGER_VALUE)    #cancelJobTask
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 4155e6c5e67..424ce238544 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -289,7 +289,7 @@ public class TxnUtil {
                 .setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
 
         if (streamingTaskTxnCommitAttachment.getOffset() != null) {
-            
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset());
+            builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
         }
 
         
attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 4c79df0acf7..9b0e5aeb801 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -682,6 +682,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
 
             String currentMaxFile = "";
             boolean isTruncated = false;
+            boolean reachLimit = false;
             do {
                 roundCnt++;
                 ListObjectsV2Response response = listObjectsV2(request);
@@ -716,12 +717,16 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
                         result.add(remoteFile);
 
                         if (reachLimit(result.size(), matchFileSize, 
fileSizeLimit, fileNumLimit)) {
+                            reachLimit = true;
                             break;
                         }
 
                         objPath = objPath.getParent();
                         isPrefix = true;
                     }
+                    if (reachLimit) {
+                        break;
+                    }
                 }
                 //record current last object file name
                 S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
@@ -733,7 +738,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                             
.continuationToken(response.nextContinuationToken())
                             .build();
                 }
-            } while (isTruncated);
+            } while (isTruncated && !reachLimit);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("remotePath:{}, result:{}", remotePath, result);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 3d8c9afa36b..423b436979f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -124,22 +124,7 @@ public class JobExecutionConfiguration {
             return delayTimeSeconds;
         }
 
-        if (JobExecuteType.STREAMING.equals(executeType) && null != 
timerDefinition) {
-            if (null == timerDefinition.getStartTimeMs() || null != 
timerDefinition.getLatestSchedulerTimeMs()) {
-                return delayTimeSeconds;
-            }
-
-            // If the job is already executed or in the schedule queue, or not 
within this schedule window
-            if (endTimeMs < timerDefinition.getStartTimeMs()) {
-                return delayTimeSeconds;
-            }
-
-            delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, 
timerDefinition.getStartTimeMs()));
-            
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
-            return delayTimeSeconds;
-        }
-
-        if (JobExecuteType.RECURRING.equals(executeType)) {
+        if (JobExecuteType.RECURRING.equals(executeType) || 
JobExecuteType.STREAMING.equals(executeType)) {
             if (timerDefinition.getStartTimeMs() > endTimeMs || null != 
timerDefinition.getEndTimeMs()
                     && timerDefinition.getEndTimeMs() < startTimeMs) {
                 return delayTimeSeconds;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
similarity index 91%
rename from 
fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 49a46327b32..2acfb472a40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -27,17 +27,21 @@ import com.google.gson.annotations.SerializedName;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class PauseReason implements Writable {
+public class FailureReason implements Writable {
     @SerializedName(value = "code")
     private InternalErrorCode code;
     @SerializedName(value = "msg")
     private String msg;
 
-    public PauseReason(InternalErrorCode errCode, String msg) {
+    public FailureReason(InternalErrorCode errCode, String msg) {
         this.code = errCode;
         this.msg = msg;
     }
 
+    public FailureReason(String msg) {
+        this.msg = msg;
+    }
+
     public InternalErrorCode getCode() {
         return code;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 67b56dbc13d..a4d6793b0b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -99,6 +99,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .addAll(COMMON_SCHEMA)
             .add(new Column("Comment", ScalarType.createStringType()))
             // only execute type = streaming need record
+            .add(new Column("Properties", ScalarType.createStringType()))
             .add(new Column("Progress", ScalarType.createStringType()))
             .add(new Column("RemoteOffset", ScalarType.createStringType()))
             .add(new Column("LoadStatistic", ScalarType.createStringType()))
@@ -549,6 +550,7 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new TCell().setStringVal(
                 loadStatistic == null ? FeConstants.null_string : 
loadStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 34cfdf6edea..261ca012311 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()),
-            new Column("Offset", ScalarType.createStringType()));
+            new Column("Offset", ScalarType.createStringType()),
+            new Column("OtherMsg", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -295,6 +296,7 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b872040c4c3..157629520a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -24,7 +24,6 @@ import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
@@ -32,10 +31,10 @@ import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.common.PauseReason;
 import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
@@ -72,6 +71,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 @Log4j2
@@ -80,7 +80,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     private final long dbId;
     private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
     @Getter
-    protected PauseReason pauseReason;
+    @SerializedName("fr")
+    protected FailureReason failureReason;
     @Getter
     @Setter
     protected long latestAutoResumeTimestamp;
@@ -225,6 +226,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
+        if (task.getErrMsg() != null) {
+            this.failureReason = new FailureReason(task.getErrMsg());
+        }
         // Here is the failure of StreamingJobSchedulerTask, no processing is 
required
         getRunningTasks().remove(task);
     }
@@ -240,7 +244,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
-                this.pauseReason = new 
PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
+                this.failureReason = new FailureReason(task.getErrMsg());
             }
         } finally {
             lock.writeLock().unlock();
@@ -262,11 +266,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
+        if (this.jobStatistic == null) {
+            this.jobStatistic = new StreamingJobStatistic();
+        }
         this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + 
attachment.getScannedRows());
         this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
         this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getFileNumber());
         this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileSize());
-        offsetProvider.updateOffset(attachment.getOffset());
+        
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
     }
 
     @Override
@@ -297,7 +304,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new TCell().setStringVal(getJobName()));
         trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
         trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
-        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
         trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
@@ -305,6 +312,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
+        trow.addToColumnValue(new TCell().setStringVal(properties != null
+                ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
 
         if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
             trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getSyncOffset()));
@@ -320,8 +329,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
         trow.addToColumnValue(new TCell().setStringVal(
                 jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
-        trow.addToColumnValue(
-                new TCell().setStringVal(pauseReason == null ? 
FeConstants.null_string : pauseReason.getMsg()));
+        trow.addToColumnValue(new TCell().setStringVal(failureReason == null
+                ? FeConstants.null_string : failureReason.getMsg()));
         return trow;
     }
 
@@ -359,6 +368,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             taskIds.add(runningStreamTask.getTaskId());
             List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
             if (loadJobs.size() != 1) {
+                shouldRealseLock = true;
                 throw new TransactionException("load job not found, insert job 
id is " + runningStreamTask.getTaskId());
             }
             LoadJob loadJob = loadJobs.get(0);
@@ -377,7 +387,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getLoadBytes(),
                         loadStatistic.getFileNumber(),
                         loadStatistic.getTotalFileSizeB(),
-                        runningStreamTask.getRunningOffset()));
+                        runningStreamTask.getRunningOffset().toJson()));
         } finally {
             if (shouldRealseLock) {
                 lock.writeLock().unlock();
@@ -387,6 +397,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void beforeAborted(TransactionState txnState) throws 
TransactionException {
+
     }
 
     @Override
@@ -403,6 +414,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         StreamingTaskTxnCommitAttachment attachment =
                 (StreamingTaskTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
         updateJobStatisticAndOffset(attachment);
+        succeedTaskCount.incrementAndGet();
     }
 
     public void replayOnCloudMode() throws UserException {
@@ -462,5 +474,19 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (jobProperties == null && properties != null) {
             jobProperties = new StreamingJobProperties(properties);
         }
+
+        if (null == getSucceedTaskCount()) {
+            setSucceedTaskCount(new AtomicLong(0));
+        }
+        if (null == getFailedTaskCount()) {
+            setFailedTaskCount(new AtomicLong(0));
+        }
+        if (null == getCanceledTaskCount()) {
+            setCanceledTaskCount(new AtomicLong(0));
+        }
+
+        if (null == lock) {
+            this.lock = new ReentrantReadWriteLock(true);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index c171173fd3c..3073142c151 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -55,6 +55,8 @@ public class StreamingInsertTask {
     @Setter
     private TaskStatus status;
     private String errMsg;
+    @Setter
+    private String otherMsg;
     private Long createTimeMs;
     private Long startTimeMs;
     private Long finishTimeMs;
@@ -99,8 +101,8 @@ public class StreamingInsertTask {
             if (TaskStatus.CANCELED.equals(status)) {
                 return;
             }
-            onFail(e.getMessage());
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
+            onFail(e.getMessage());
         } finally {
             // The cancel logic will call the closeOrReleased Resources method 
by itself.
             // If it is also called here,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 6f083a82c55..b658c97c828 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,15 +17,21 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.PauseReason;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class StreamingJobSchedulerTask extends AbstractTask {
     private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
     private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
@@ -55,15 +61,15 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
     }
 
     private void autoResumeHandler() throws JobException {
-        final PauseReason pauseReason = streamingInsertJob.getPauseReason();
+        final FailureReason failureReason = 
streamingInsertJob.getFailureReason();
         final long latestAutoResumeTimestamp = 
streamingInsertJob.getLatestAutoResumeTimestamp();
         final long autoResumeCount = streamingInsertJob.getAutoResumeCount();
         final long current = System.currentTimeMillis();
 
-        if (pauseReason != null
-                && pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
-                && pauseReason.getCode() != 
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
-                && pauseReason.getCode() != 
InternalErrorCode.CANNOT_RESUME_ERR) {
+        if (failureReason != null
+                && failureReason.getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR
+                && failureReason.getCode() != 
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
+                && failureReason.getCode() != 
InternalErrorCode.CANNOT_RESUME_ERR) {
             long autoResumeIntervalTimeSec = autoResumeCount < 5
                         ? Math.min((long) Math.pow(2, autoResumeCount) * 
BACK_OFF_BASIC_TIME_SEC,
                                 MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
@@ -107,20 +113,40 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getErrMsg()));
         // create time
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
-        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? ""
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
                 : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
         // load end time
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
-        // tracking url
-        trow.addToColumnValue(new TCell().setStringVal("trackingUrl"));
-        trow.addToColumnValue(new TCell().setStringVal("statistic"));
+
+        List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+                .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
+        if (!loadJobs.isEmpty()) {
+            LoadJob loadJob = loadJobs.get(0);
+            if (loadJob.getLoadingStatus() != null && 
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+
+            if (loadJob.getLoadStatistic() != null) {
+                trow.addToColumnValue(new 
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+            } else {
+                trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            }
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
         if (runningTask.getUserIdentity() == null) {
-            trow.addToColumnValue(new TCell().setStringVal(""));
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
         }
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getRunningOffset() == null ? ""
+        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getRunningOffset() == null ? 
FeConstants.null_string
                 : runningTask.getRunningOffset().toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
runningTask.getOtherMsg()
+                ? FeConstants.null_string : runningTask.getOtherMsg()));
         return trow;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 8660ed94739..4b7590824b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -18,7 +18,6 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
-import org.apache.doris.job.offset.Offset;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TxnCommitAttachment;
 
@@ -28,7 +27,7 @@ import lombok.Getter;
 public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
 
     public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
-                long scannedRows, long loadBytes, long fileNumber, long 
fileSize, Offset offset) {
+                long scannedRows, long loadBytes, long fileNumber, long 
fileSize, String offset) {
         super(TransactionState.LoadJobSourceType.STREAMING_JOB);
         this.jobId = jobId;
         this.taskId = taskId;
@@ -45,7 +44,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         this.loadBytes = pb.getLoadBytes();
         this.fileNumber = pb.getFileNumber();
         this.fileSize = pb.getFileSize();
-        this.offset.setEndOffset(pb.getOffset());
+        this.offset = pb.getOffset();
     }
 
     @Getter
@@ -66,7 +65,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
     private long fileSize;
     @SerializedName(value = "of")
     @Getter
-    private Offset offset;
+    private String offset;
 
     @Override
     public String toString() {
@@ -75,7 +74,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
                 + ", loadBytes=" + loadBytes
                 + ", fileNumber=" + fileNumber
                 + ", fileSize=" + fileSize
-                + ", offset=" + offset.toString()
+                + ", offset=" + offset
                 + "]";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index e670dac553c..d7e1bee4669 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -81,5 +81,11 @@ public interface SourceOffsetProvider {
      */
     boolean hasMoreDataToConsume();
 
+    /**
+     * Deserialize string offset to Offset
+     * @return
+     */
+    Offset deserializeOffset(String offset);
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 2ab2030fbbb..3d260218886 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -20,6 +20,7 @@ package org.apache.doris.job.offset.s3;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -27,6 +28,7 @@ import lombok.Setter;
 @Setter
 public class S3Offset implements Offset {
     String startFile;
+    @SerializedName("ef")
     String endFile;
     String fileLists;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index f63333468fa..e429ae7375b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.Properties;
 import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
 import lombok.extern.log4j.Log4j2;
@@ -72,8 +73,7 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
                 String parentPath = rfiles.get(0).getParentPath();
                 String filePaths = 
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{", 
"}"));
                 String finalFiles = String.format("s3://%s/%s/%s", bucket, 
parentPath, filePaths);
-                offset.setEndFile(
-                        String.format("s3://%s/%s/%s", bucket, parentPath, 
rfiles.get(rfiles.size() - 1).getName()));
+                offset.setEndFile(String.format("%s/%s", parentPath, 
rfiles.get(rfiles.size() - 1).getName()));
                 offset.setFileLists(finalFiles);
             }
         } catch (Exception e) {
@@ -153,4 +153,9 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
         }
         return false;
     }
+
+    @Override
+    public Offset deserializeOffset(String offset) {
+        return GsonUtils.GSON.fromJson(offset, S3Offset.class);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 3fbae399303..6e664b404da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -109,6 +109,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
     }
 
     private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs) 
{
+        task.setOtherMsg("No data available for consumption at the moment, 
will retry after "
+                + (System.currentTimeMillis() + delayMs));
         delayScheduler.schedule(() -> {
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
         }, delayMs, TimeUnit.MILLISECONDS);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 80a0886017e..a7b67f2ffda 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1163,11 +1163,10 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
 
     @Override
     public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
-        checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
         Map<String, String> properties = ctx.propertyClause() != null
                 ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
         String executeSql = getOriginSql(ctx.supportedDmlStatement());
-        return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()), 
properties, executeSql);
+        return new AlterJobCommand(ctx.jobName.getText(), properties, 
executeSql);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index b1823de4a1d..786c8184e40 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -80,7 +80,7 @@ public class AlterJobCommand extends AlterCommand implements 
ForwardWithSync {
             Map<String, String> updateProps =
                     properties == null || properties.isEmpty() ? 
originJob.getProperties() : properties;
 
-            return new StreamingInsertJob(jobName,
+            StreamingInsertJob streamingInsertJob = new 
StreamingInsertJob(jobName,
                     job.getJobStatus(),
                     job.getCurrentDbName(),
                     job.getComment(),
@@ -89,6 +89,8 @@ public class AlterJobCommand extends AlterCommand implements 
ForwardWithSync {
                     System.currentTimeMillis(),
                     updateSQL,
                     updateProps);
+            streamingInsertJob.setJobId(job.getJobId());
+            return streamingInsertJob;
         } else {
             throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index af79329f681..dffd7a8c80e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -183,6 +183,7 @@ import org.apache.doris.fs.remote.dfs.JFSFileSystem;
 import org.apache.doris.fs.remote.dfs.OFSFileSystem;
 import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import 
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
 import org.apache.doris.load.loadv2.BulkLoadJob;
@@ -557,7 +558,9 @@ public class GsonUtils {
             .registerDefaultSubtype(TxnCommitAttachment.class)
             .registerSubtype(LoadJobFinalOperation.class, 
LoadJobFinalOperation.class.getSimpleName())
             .registerSubtype(MiniLoadTxnCommitAttachment.class, 
MiniLoadTxnCommitAttachment.class.getSimpleName())
-            .registerSubtype(RLTaskTxnCommitAttachment.class, 
RLTaskTxnCommitAttachment.class.getSimpleName());
+            .registerSubtype(RLTaskTxnCommitAttachment.class, 
RLTaskTxnCommitAttachment.class.getSimpleName())
+            .registerSubtype(StreamingTaskTxnCommitAttachment.class,
+                    StreamingTaskTxnCommitAttachment.class.getSimpleName());
 
     // runtime adapter for class "RoutineLoadProgress".
     private static RuntimeTypeAdapterFactory<RoutineLoadProgress> 
routineLoadTypeAdapterFactory
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
index b2de57f9110..bef0bb92d20 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
@@ -23,7 +23,6 @@ import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand;
 import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
@@ -36,9 +35,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 
@@ -66,15 +63,9 @@ public class DropFunctionTest {
     public void testDropGlobalFunction() throws Exception {
         ConnectContext ctx = UtFrameUtils.createDefaultCtx();
         // 1. create database db1
-        //String sql = "create database db1;";
-        String sql = "insert into db1.tb select * from 
s3('url'='s3://a/*.csv')";
+        String sql = "create database db1;";
         NereidsParser nereidsParser = new NereidsParser();
         LogicalPlan logicalPlan = nereidsParser.parseSingle(sql);
-        InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(sql);
-        baseCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
-        Map<String, String> map = new HashMap<>();
-        map.put("url", "s3:/xxxx/*.");
-
         StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
         if (logicalPlan instanceof CreateDatabaseCommand) {
             ((CreateDatabaseCommand) logicalPlan).run(connectContext, 
stmtExecutor);
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index bd8ee2759fd..3982876b9f4 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -42,9 +42,13 @@ suite("test_streaming_insert_job") {
     """
 
 
-    // create recurring job
+    // create streaming job
     sql """
-       CREATE JOB ${jobName}  ON STREAMING DO INSERT INTO ${tableName} 
+       CREATE JOB ${jobName}  
+       PROPERTIES(
+        "s3.batch_files" = "1"
+       )
+       ON STREAMING DO INSERT INTO ${tableName} 
        SELECT * FROM S3
         (
             "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
@@ -57,12 +61,13 @@ suite("test_streaming_insert_job") {
             "s3.secret_key" = "${getS3SK()}"
         );
     """
-    Awaitility.await().atMost(30, SECONDS).until(
+    Awaitility.await().atMost(30, SECONDS)
+            .pollInterval(1, SECONDS).until(
             {
                 print("check success task count")
                 def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
-                // check job status and succeed task count larger than 1
-                jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+                // check job status and succeed task count larger than 2
+                jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
             }
     )
 
@@ -76,6 +81,40 @@ suite("test_streaming_insert_job") {
 
     qt_select """ SELECT * FROM ${tableName} order by c1 """
 
+    def jobOffset = sql """
+        select progress, remoteoffset from jobs("type"="insert") where 
Name='${jobName}'
+    """
+    assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
+    assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
+    //todo check status
+
+    // alter streaming job
+    sql """
+       ALTER JOB FOR ${jobName}
+       PROPERTIES(
+        "s3.batch_files" = "1",
+        "session.insert_max_filter_ratio" = "0.5"
+       )
+       INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+
+    def alterJobProperties = sql """
+        select properties from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert alterJobProperties.get(0).get(0) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+
+
     sql """
         DROP JOB IF EXISTS where jobname =  '${jobName}'
     """
@@ -83,4 +122,6 @@ suite("test_streaming_insert_job") {
     def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
     assert jobCountRsp.get(0).get(0) == 0
 
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to