This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 777ba161038 [Featuren](WIP) add alter job op and fix bug (#56194)
777ba161038 is described below

commit 777ba161038433f520057142d957181666b9a142
Author: wudi <[email protected]>
AuthorDate: Thu Sep 18 22:31:37 2025 +0800

    [Featuren](WIP) add alter job op and fix bug (#56194)
    
    ### What problem does this PR solve?
    
    add alter job op and fix bug
---
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java |  8 +-
 .../org/apache/doris/job/base/AbstractJob.java     |  5 ++
 .../doris/job/extensions/insert/InsertJob.java     |  6 +-
 .../doris/job/extensions/insert/InsertTask.java    |  7 +-
 .../insert/streaming/StreamingInsertJob.java       | 62 ++++++++++------
 .../insert/streaming/StreamingInsertTask.java      |  2 +-
 .../streaming/StreamingJobSchedulerTask.java       | 23 ++++--
 .../org/apache/doris/job/manager/JobManager.java   | 13 ++++
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  4 +-
 .../job/offset/s3/S3SourceOffsetProvider.java      | 10 ++-
 .../org/apache/doris/job/task/AbstractTask.java    |  2 +-
 .../org/apache/doris/journal/JournalEntity.java    |  6 ++
 .../trees/plans/commands/AlterJobCommand.java      | 37 ++++------
 .../trees/plans/commands/CreateJobCommand.java     |  2 +-
 .../trees/plans/commands/ResumeJobCommand.java     |  9 ++-
 .../persist/AlterStreamingJobOperationLog.java     | 86 ++++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     |  9 +++
 .../org/apache/doris/persist/OperationType.java    |  2 +
 .../streaming_job/test_streaming_insert_job.groovy | 66 ++++++++++++-----
 19 files changed, 276 insertions(+), 83 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 9b0e5aeb801..6f710b74c1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -728,9 +728,11 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                         break;
                     }
                 }
-                //record current last object file name
-                S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
-                currentMaxFile = lastS3Object.key();
+                if (!response.contents().isEmpty()) {
+                    //record current last object file name
+                    S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
+                    currentMaxFile = lastS3Object.key();
+                }
 
                 isTruncated = response.isTruncated();
                 if (isTruncated) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index d08942460f5..24be6358512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.thrift.TCell;
@@ -475,6 +476,10 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", 
"replay delete scheduler job").build());
     }
 
+    public void onReplayUpdateStreaming(AlterStreamingJobOperationLog 
operationLog) {
+        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", 
"replay update streaming job").build());
+    }
+
     public boolean needPersist() {
         return true;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 7658c3cb54d..cc16cbb603d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("Comment", ScalarType.createStringType()))
             // only execute type = streaming need record
             .add(new Column("Properties", ScalarType.createStringType()))
-            .add(new Column("Progress", ScalarType.createStringType()))
-            .add(new Column("RemoteOffset", ScalarType.createStringType()))
+            .add(new Column("ConsumedOffset", ScalarType.createStringType()))
+            .add(new Column("MaxOffset", ScalarType.createStringType()))
             .add(new Column("LoadStatistic", ScalarType.createStringType()))
             .add(new Column("ErrorMsg", ScalarType.createStringType()))
             .build();
@@ -120,9 +120,9 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
                     .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("User", 
ScalarType.createVarchar(50)))
+                    .addColumn(new Column("FirstErrorMsg", 
ScalarType.createVarchar(200)))
                     // only execute type = streaming need record
                     .addColumn(new Column("Offset", 
ScalarType.createStringType()))
-                    .addColumn(new Column("FirstErrorMsg", 
ScalarType.createVarchar(200)))
                     .build();
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 8ff292ea8c0..5875699fb0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,9 +68,8 @@ public class InsertTask extends AbstractTask {
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()),
-            new Column("Offset", ScalarType.createStringType()),
-            new Column("OtherMsg", ScalarType.createStringType()),
-            new Column("FirstErrorMsg", ScalarType.createStringType()));
+            new Column("FirstErrorMsg", ScalarType.createStringType()),
+            new Column("Offset", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -277,8 +276,8 @@ public class InsertTask extends AbstractTask {
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         }
-        trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ? 
"" : firstErrorMsg));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 157629520a9..70463894a66 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -39,14 +39,15 @@ import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.InsertTask;
 import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.SourceOffsetProviderFactory;
-import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -87,10 +88,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     protected long latestAutoResumeTimestamp;
     @Getter
     @Setter
-    protected long autoResumeCount;
+    protected long autoResumeCount = 0L;
     @Getter
     @SerializedName("props")
-    private final Map<String, String> properties;
+    private Map<String, String> properties;
     private StreamingJobProperties jobProperties;
     @Getter
     @SerializedName("tvf")
@@ -114,7 +115,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             Long createTimeMs,
             String executeSql,
             Map<String, String> properties) {
-        super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+        super(Env.getCurrentEnv().getNextId(), jobName, jobStatus, dbName, 
comment, createUser,
                 jobConfig, createTimeMs, executeSql);
         this.dbId = ConnectContext.get().getCurrentDbId();
         this.properties = properties;
@@ -148,6 +149,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     private UnboundTVFRelation getCurrentTvf() {
         if (baseCommand == null) {
+            ConnectContext ctx = 
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
+            StatementContext statementContext = new StatementContext();
+            ctx.setStatementContext(statementContext);
             this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
         }
         List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
@@ -161,6 +165,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         lock.writeLock().lock();
         try {
             super.updateJobStatus(status);
+            if (JobStatus.PAUSED.equals(getJobStatus())) {
+                clearRunningStreamTask();
+            }
             if (isFinalStatus()) {
                 
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
             }
@@ -193,7 +200,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     protected StreamingInsertTask createStreamingInsertTask() {
-        this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), getExecuteSql(),
+        this.runningStreamTask = new StreamingInsertTask(getJobId(), 
Env.getCurrentEnv().getNextId(), getExecuteSql(),
                 offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
@@ -215,6 +222,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
     }
 
+    public void clearRunningStreamTask() {
+        if (runningStreamTask != null) {
+            runningStreamTask.closeOrReleaseResources();
+            runningStreamTask = null;
+        }
+    }
+
     // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
     public boolean needDelayScheduleTask() {
         return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
@@ -224,6 +238,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return offsetProvider.hasMoreDataToConsume();
     }
 
+    @Override
+    public void logUpdateOperation() {
+        AlterStreamingJobOperationLog log =
+                new AlterStreamingJobOperationLog(this.getJobId(), 
this.getJobStatus(), properties, getExecuteSql());
+        Env.getCurrentEnv().getEditLog().logUpdateStreamingJob(log);
+    }
+
     @Override
     public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
         if (task.getErrMsg() != null) {
@@ -287,6 +308,15 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.onReplayCreate();
     }
 
+    @Override
+    public void onReplayUpdateStreaming(AlterStreamingJobOperationLog 
operationLog) {
+        super.onReplayUpdateStreaming(operationLog);
+        setJobStatus(operationLog.getStatus());
+        this.properties = operationLog.getJobProperties();
+        this.jobProperties = new StreamingJobProperties(properties);
+        setExecuteSql(operationLog.getExecuteSql());
+    }
+
     @Override
     public ShowResultSetMetaData getTaskMetaData() {
         return InsertJob.TASK_META_DATA;
@@ -361,25 +391,15 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
-        boolean shouldRealseLock = false;
+        boolean shouldReleaseLock = false;
         lock.writeLock().lock();
         try {
             ArrayList<Long> taskIds = new ArrayList<>();
             taskIds.add(runningStreamTask.getTaskId());
-            List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
-            if (loadJobs.size() != 1) {
-                shouldRealseLock = true;
-                throw new TransactionException("load job not found, insert job 
id is " + runningStreamTask.getTaskId());
-            }
-            LoadJob loadJob = loadJobs.get(0);
-
-            if (txnState.getTransactionId() != loadJob.getTransactionId()
-                    || 
!runningStreamTask.getStatus().equals(TaskStatus.RUNNING)) {
-                shouldRealseLock = true;
-                throw new TransactionException("txn " + 
txnState.getTransactionId() + "should be aborted.");
-            }
+            // todo: Check whether the taskid of runningtask is consistent 
with the taskid associated with txn
 
-            LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+            // todo: need get loadStatistic, load manager statistic is empty
+            LoadStatistic loadStatistic = new LoadStatistic();
             txnState.setTxnCommitAttachment(new 
StreamingTaskTxnCommitAttachment(
                         getJobId(),
                         runningStreamTask.getTaskId(),
@@ -389,7 +409,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getTotalFileSizeB(),
                         runningStreamTask.getRunningOffset().toJson()));
         } finally {
-            if (shouldRealseLock) {
+            if (shouldReleaseLock) {
                 lock.writeLock().unlock();
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 3073142c151..d5eb39cd064 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -118,7 +118,7 @@ public class StreamingInsertTask {
         this.startTimeMs = System.currentTimeMillis();
 
         if (isCanceled.get()) {
-            throw new JobException("Export executor has been canceled, task 
id: {}", getTaskId());
+            throw new JobException("Streaming insert task has been canceled, 
task id: {}", getTaskId());
         }
         ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
         ctx.setSessionVariable(jobProperties.getSessionVariable());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index b658c97c828..ec4eb0036d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -29,6 +29,8 @@ import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Arrays;
 import java.util.List;
 
@@ -47,7 +49,6 @@ public class StreamingJobSchedulerTask extends AbstractTask {
             case PENDING:
                 streamingInsertJob.createStreamingInsertTask();
                 streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
-                streamingInsertJob.setAutoResumeCount(0);
                 break;
             case RUNNING:
                 streamingInsertJob.fetchMeta();
@@ -78,6 +79,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
                 if (autoResumeCount < Long.MAX_VALUE) {
                     streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
                 }
+                streamingInsertJob.createStreamingInsertTask();
                 streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
                 return;
             }
@@ -86,9 +88,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
 
     @Override
     protected void closeOrReleaseResources() {
-        if (streamingInsertJob.getRunningStreamTask() != null) {
-            
streamingInsertJob.getRunningStreamTask().closeOrReleaseResources();
-        }
+        // do nothing
     }
 
     @Override
@@ -110,7 +110,17 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(jobName));
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getLabelName()));
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getStatus().name()));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getErrMsg()));
+        // err msg
+        String errMsg = "";
+        if (StringUtils.isNotBlank(runningTask.getErrMsg())
+                && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
+            errMsg = runningTask.getErrMsg();
+        } else {
+            errMsg = runningTask.getOtherMsg();
+        }
+        trow.addToColumnValue(new 
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+                ? errMsg : FeConstants.null_string));
+
         // create time
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
         trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? FeConstants.null_string
@@ -143,10 +153,9 @@ public class StreamingJobSchedulerTask extends 
AbstractTask {
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
         }
+        trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getRunningOffset() == null ? 
FeConstants.null_string
                 : runningTask.getRunningOffset().toJson()));
-        trow.addToColumnValue(new TCell().setStringVal(null == 
runningTask.getOtherMsg()
-                ? FeConstants.null_string : runningTask.getOtherMsg()));
         return trow;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 2b8c7225b2a..34fc29f1c8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -45,6 +45,7 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.expressions.And;
 import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
@@ -363,6 +364,18 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
                 .add("msg", "replay update scheduler job").build());
     }
 
+    public void replayUpdateStreamingJob(AlterStreamingJobOperationLog log) {
+        Long jobId = log.getJobId();
+        if (!jobMap.containsKey(jobId)) {
+            LOG.warn("replayUpdateStreamingJob not normal, jobId: {}, jobMap: 
{}", jobId, log);
+            return;
+        }
+        T job = jobMap.get(jobId);
+        job.onReplayUpdateStreaming(log);
+        LOG.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
+                .add("msg", "replay update streaming job").build());
+    }
+
     /**
      * Replay delete load job. we need to remove job from job map
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 3d260218886..3f7956f9fe9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -27,9 +27,11 @@ import lombok.Setter;
 @Getter
 @Setter
 public class S3Offset implements Offset {
+    // path/1.csv
     String startFile;
-    @SerializedName("ef")
+    @SerializedName("endFile")
     String endFile;
+    // s3://bucket/path/{1.csv,2.csv}
     String fileLists;
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index e429ae7375b..d433e42f73c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -136,8 +137,13 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
         try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
             String uri = storageProperties.validateAndGetUri(copiedProps);
             String filePath = storageProperties.validateAndNormalizeUri(uri);
-            maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new 
ArrayList<>(), startFile,
-                    1, 1);
+            List<RemoteFile> objects = new ArrayList<>();
+            String endFile = fileSystem.globListWithLimit(filePath, objects, 
startFile, 1, 1);
+            if (!objects.isEmpty() && StringUtils.isNotEmpty(endFile)) {
+                maxRemoteEndFile = endFile;
+            } else {
+                maxRemoteEndFile = startFile;
+            }
         } catch (Exception e) {
             throw e;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 18c5f525295..4e2ac653cf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask implements Task {
         taskId = getNextTaskId();
     }
 
-    public static long getNextTaskId() {
+    private static long getNextTaskId() {
         // do not use Env.getNextId(), just generate id without logging
         return System.nanoTime() + RandomUtils.nextInt();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index cb6dee000de..f0f9d471ed9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -67,6 +67,7 @@ import org.apache.doris.persist.AlterDatabasePropertyInfo;
 import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.persist.AlterMTMV;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.AlterUserOperationLog;
 import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.AnalyzeDeletionLog;
@@ -550,6 +551,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_UPDATE_STREAMING_JOB: {
+                data = AlterStreamingJobOperationLog.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_CREATE_SCHEDULER_TASK:
             case OperationType.OP_DELETE_SCHEDULER_TASK: {
                 //todo improve
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 786c8184e40..adde81fbfe9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -33,6 +33,7 @@ import org.apache.doris.qe.StmtExecutor;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * alter job command.
@@ -75,22 +76,16 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync {
     private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws 
JobException {
         AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
         if (job instanceof StreamingInsertJob) {
-            StreamingInsertJob originJob = (StreamingInsertJob) job;
-            String updateSQL = StringUtils.isEmpty(sql) ? 
originJob.getExecuteSql() : sql;
-            Map<String, String> updateProps =
-                    properties == null || properties.isEmpty() ? 
originJob.getProperties() : properties;
-
-            StreamingInsertJob streamingInsertJob = new 
StreamingInsertJob(jobName,
-                    job.getJobStatus(),
-                    job.getCurrentDbName(),
-                    job.getComment(),
-                    ConnectContext.get().getCurrentUserIdentity(),
-                    originJob.getJobConfig(),
-                    System.currentTimeMillis(),
-                    updateSQL,
-                    updateProps);
-            streamingInsertJob.setJobId(job.getJobId());
-            return streamingInsertJob;
+            StreamingInsertJob updateJob = (StreamingInsertJob) job;
+            // update sql
+            if (StringUtils.isNotEmpty(sql)) {
+                updateJob.setExecuteSql(sql);
+            }
+            // update properties
+            if (!properties.isEmpty()) {
+                updateJob.getProperties().putAll(properties);
+            }
+            return updateJob;
         } else {
             throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
         }
@@ -119,20 +114,20 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync {
     }
 
     private boolean checkProperties(Map<String, String> originProps) {
-        if (originProps.isEmpty()) {
+        if (this.properties == null || this.properties.isEmpty()) {
             return false;
         }
-        if (!originProps.equals(properties)) {
+        if (!Objects.equals(this.properties, originProps)) {
             return true;
         }
         return false;
     }
 
-    private boolean checkSql(String sql) {
-        if (sql == null || sql.isEmpty()) {
+    private boolean checkSql(String originSql) {
+        if (originSql == null || originSql.isEmpty()) {
             return false;
         }
-        if (!sql.equals(sql)) {
+        if (!originSql.equals(this.sql)) {
             return true;
         }
         return false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index fe81921211f..baac59fd7b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -67,7 +67,7 @@ public class CreateJobCommand extends Command implements 
ForwardWithSync {
         if (createJobInfo.streamingJob()) {
             int streamingJobCnt = 
Env.getCurrentEnv().getJobManager().getStreamingJobCnt();
             if (streamingJobCnt >= Config.max_streaming_job_num) {
-                throw new JobException("Exceed max streaming job num limit " + 
Config.max_streaming_job_num);
+                throw new JobException("Exceed max streaming job num limit in 
fe.conf:" + Config.max_streaming_job_num);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 3935f73f0e9..1ad21237a39 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -21,7 +21,9 @@ import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -46,7 +48,12 @@ public class ResumeJobCommand extends AlterJobStatusCommand 
implements ForwardWi
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
         }
-        ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.RUNNING);
+        AbstractJob job = 
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
+        if (job instanceof StreamingInsertJob) {
+            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.PENDING);
+        } else {
+            ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), 
JobStatus.RUNNING);
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
new file mode 100644
index 00000000000..785531fa0d1
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class AlterStreamingJobOperationLog implements Writable {
+    @SerializedName(value = "jid")
+    private long jobId;
+    @SerializedName(value = "js")
+    private JobStatus status;
+    @SerializedName(value = "jp")
+    private Map<String, String> jobProperties;
+    @SerializedName(value = "sql")
+    String executeSql;
+
+    public AlterStreamingJobOperationLog(long jobId, JobStatus status,
+            Map<String, String> jobProperties, String executeSql) {
+        this.jobId = jobId;
+        this.status = status;
+        this.jobProperties = jobProperties;
+        this.executeSql = executeSql;
+    }
+
+    public long getJobId() {
+        return jobId;
+    }
+
+    public Map<String, String> getJobProperties() {
+        return jobProperties;
+    }
+
+    public String getExecuteSql() {
+        return executeSql;
+    }
+
+    public JobStatus getStatus() {
+        return status;
+    }
+
+    public static AlterStreamingJobOperationLog read(DataInput in) throws 
IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, 
AlterStreamingJobOperationLog.class);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    @Override
+    public String toString() {
+        return "AlterStreamingJobOperationLog{"
+                +  "jobId=" + jobId
+                + ", status=" + status
+                + ", jobProperties=" + jobProperties
+                + ", executeSql='" + executeSql + '\''
+                + '}';
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 5abcc4e0464..74be0f5e475 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -862,6 +862,11 @@ public class EditLog {
                     Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
                     break;
                 }
+                case OperationType.OP_UPDATE_STREAMING_JOB: {
+                    AlterStreamingJobOperationLog log = 
(AlterStreamingJobOperationLog) journal.getData();
+                    
Env.getCurrentEnv().getJobManager().replayUpdateStreamingJob(log);
+                    break;
+                }
                 /*case OperationType.OP_CREATE_SCHEDULER_TASK: {
                     JobTask task = (JobTask) journal.getData();
                     
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
@@ -2038,6 +2043,10 @@ public class EditLog {
         logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
     }
 
+    public void logUpdateStreamingJob(AlterStreamingJobOperationLog log) {
+        logEdit(OperationType.OP_UPDATE_STREAMING_JOB, log);
+    }
+
     public void logDeleteJob(AbstractJob job) {
         logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 2bbc03d4aea..12d59d086b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -414,6 +414,8 @@ public class OperationType {
 
     public static final short OP_OPERATE_KEY = 492;
 
+    public static final short OP_UPDATE_STREAMING_JOB = 493;
+
     // For cloud.
     public static final short OP_UPDATE_CLOUD_REPLICA = 1000;
     @Deprecated
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 3982876b9f4..d89bc8d2f0f 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -61,15 +61,28 @@ suite("test_streaming_insert_job") {
             "s3.secret_key" = "${getS3SK()}"
         );
     """
-    Awaitility.await().atMost(30, SECONDS)
-            .pollInterval(1, SECONDS).until(
-            {
-                print("check success task count")
-                def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
-                // check job status and succeed task count larger than 2
-                jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
-            }
-    )
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    print("check success task count")
+                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                    // check job status and succeed task count larger than 2
+                    jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        println("show job: " + showjob)
+        println("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobResult = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+    println("show success job: " + jobResult)
+
+    qt_select """ SELECT * FROM ${tableName} order by c1 """
 
     sql """
         PAUSE JOB where jobname =  '${jobName}'
@@ -79,20 +92,20 @@ suite("test_streaming_insert_job") {
     """
     assert pausedJobStatus.get(0).get(0) == "PAUSED"
 
-    qt_select """ SELECT * FROM ${tableName} order by c1 """
+    def pauseShowTask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+    assert pauseShowTask.size() == 0
+
 
     def jobOffset = sql """
-        select progress, remoteoffset from jobs("type"="insert") where 
Name='${jobName}'
+        select ConsumedOffset, MaxOffset from jobs("type"="insert") where 
Name='${jobName}'
     """
     assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
     assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
-    //todo check status
 
     // alter streaming job
     sql """
        ALTER JOB FOR ${jobName}
        PROPERTIES(
-        "s3.batch_files" = "1",
         "session.insert_max_filter_ratio" = "0.5"
        )
        INSERT INTO ${tableName}
@@ -110,10 +123,31 @@ suite("test_streaming_insert_job") {
     """
 
     def alterJobProperties = sql """
-        select properties from jobs("type"="insert") where Name='${jobName}'
+        select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
+    """
+    assert alterJobProperties.get(0).get(0) == "PAUSED"
+    assert alterJobProperties.get(0).get(1) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+    assert alterJobProperties.get(0).get(2) == 
"regression/load/data/example_1.csv"
+
+    sql """
+        RESUME JOB where jobname =  '${jobName}'
+    """
+    def resumeJobStatus = sql """
+        select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
     """
-    assert alterJobProperties.get(0).get(0) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+    assert resumeJobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
+    assert resumeJobStatus.get(0).get(1) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+    assert resumeJobStatus.get(0).get(2) == 
"regression/load/data/example_1.csv"
 
+    Awaitility.await().atMost(60, SECONDS)
+            .pollInterval(1, SECONDS).until(
+            {
+                print("check create streaming task count")
+                def resumeShowTask = sql """select * from 
tasks("type"="insert") where JobName='${jobName}'"""
+                // check streaming task create success
+                resumeShowTask.size() == 1
+            }
+    )
 
     sql """
         DROP JOB IF EXISTS where jobname =  '${jobName}'
@@ -122,6 +156,4 @@ suite("test_streaming_insert_job") {
     def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
     assert jobCountRsp.get(0).get(0) == 0
 
-
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to