Copilot commented on code in PR #56175:
URL: https://github.com/apache/doris/pull/56175#discussion_r2357461606


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+
+/**
+ * alter job command.
+ */
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+    // exclude job name prefix, which is used by inner job
+    private static final String excludeJobNamePrefix = "inner_";
+    private String jobName;
+    private Map<String, String> properties;
+    private String sql;
+
+    public AlterJobCommand(String jobName, Map<String, String> properties, 
String sql) {
+        super(PlanType.ALTER_JOB_COMMAND);
+        this.jobName = jobName;
+        this.properties = properties;
+        this.sql = sql;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.ALTER;
+    }
+
+    @Override
+    public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate();
+        AbstractJob job = analyzeAndBuildJobInfo(ctx);
+        ctx.getEnv().getJobManager().alterJob(job);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitAlterJobCommand(this, context);
+    }
+
+    private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws 
JobException {
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (job instanceof StreamingInsertJob) {
+            StreamingInsertJob originJob = (StreamingInsertJob) job;
+            String updateSQL = StringUtils.isEmpty(sql) ? 
originJob.getExecuteSql() : sql;
+            Map<String, String> updateProps =
+                    properties == null || properties.isEmpty() ? 
originJob.getProperties() : properties;
+
+            StreamingInsertJob streamingInsertJob = new 
StreamingInsertJob(jobName,
+                    job.getJobStatus(),
+                    job.getCurrentDbName(),
+                    job.getComment(),
+                    ConnectContext.get().getCurrentUserIdentity(),
+                    originJob.getJobConfig(),
+                    System.currentTimeMillis(),
+                    updateSQL,
+                    updateProps);
+            streamingInsertJob.setJobId(job.getJobId());
+            return streamingInsertJob;
+        } else {
+            throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private void validate() throws Exception {
+        if (jobName.startsWith(excludeJobNamePrefix)) {
+            throw new AnalysisException("Can't alter inner job");
+        }
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
+            throw new AnalysisException("Only PAUSED job can be altered");
+        }
+
+        if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
+                && job instanceof StreamingInsertJob) {
+            StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            boolean proCheck = checkProperties(streamingJob.getProperties());
+            boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+            if (!proCheck && !sqlCheck) {
+                throw new AnalysisException("No properties or sql changed in 
ALTER JOB");
+            }
+        } else {
+            throw new AnalysisException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private boolean checkProperties(Map<String, String> originProps) {
+        if (originProps.isEmpty()) {
+            return false;
+        }
+        if (!originProps.equals(properties)) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean checkSql(String sql) {
+        if (sql == null || sql.isEmpty()) {
+            return false;
+        }
+        if (!sql.equals(sql)) {

Review Comment:
   This condition will always evaluate to true since a string is always equal 
to itself. This appears to be a copy-paste error where the method should be 
comparing the current SQL with a different SQL parameter.
   ```suggestion
       private boolean checkSql(String originSql) {
           if (originSql == null || originSql.isEmpty()) {
               return false;
           }
           if (!originSql.equals(this.sql)) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+
+/**
+ * alter job command.
+ */
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+    // exclude job name prefix, which is used by inner job
+    private static final String excludeJobNamePrefix = "inner_";
+    private String jobName;
+    private Map<String, String> properties;
+    private String sql;
+
+    public AlterJobCommand(String jobName, Map<String, String> properties, 
String sql) {
+        super(PlanType.ALTER_JOB_COMMAND);
+        this.jobName = jobName;
+        this.properties = properties;
+        this.sql = sql;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.ALTER;
+    }
+
+    @Override
+    public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        validate();
+        AbstractJob job = analyzeAndBuildJobInfo(ctx);
+        ctx.getEnv().getJobManager().alterJob(job);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitAlterJobCommand(this, context);
+    }
+
+    private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws 
JobException {
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (job instanceof StreamingInsertJob) {
+            StreamingInsertJob originJob = (StreamingInsertJob) job;
+            String updateSQL = StringUtils.isEmpty(sql) ? 
originJob.getExecuteSql() : sql;
+            Map<String, String> updateProps =
+                    properties == null || properties.isEmpty() ? 
originJob.getProperties() : properties;
+
+            StreamingInsertJob streamingInsertJob = new 
StreamingInsertJob(jobName,
+                    job.getJobStatus(),
+                    job.getCurrentDbName(),
+                    job.getComment(),
+                    ConnectContext.get().getCurrentUserIdentity(),
+                    originJob.getJobConfig(),
+                    System.currentTimeMillis(),
+                    updateSQL,
+                    updateProps);
+            streamingInsertJob.setJobId(job.getJobId());
+            return streamingInsertJob;
+        } else {
+            throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private void validate() throws Exception {
+        if (jobName.startsWith(excludeJobNamePrefix)) {
+            throw new AnalysisException("Can't alter inner job");
+        }
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
+            throw new AnalysisException("Only PAUSED job can be altered");
+        }
+
+        if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
+                && job instanceof StreamingInsertJob) {
+            StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            boolean proCheck = checkProperties(streamingJob.getProperties());
+            boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+            if (!proCheck && !sqlCheck) {
+                throw new AnalysisException("No properties or sql changed in 
ALTER JOB");
+            }
+        } else {
+            throw new AnalysisException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private boolean checkProperties(Map<String, String> originProps) {
+        if (originProps.isEmpty()) {
+            return false;
+        }
+        if (!originProps.equals(properties)) {
+            return true;
+        }
+        return false;
+    }

Review Comment:
   The parameter name `originProps` is misleading - it should be checking if 
the properties parameter is empty, not originProps. The logic suggests this 
should be `if (properties == null || properties.isEmpty())`.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.FailureReason;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.transaction.TransactionException;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TxnStateChangeCallback;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+@Log4j2
+public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
+        TxnStateChangeCallback, GsonPostProcessable {
+    private final long dbId;
+    private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
+    @Getter
+    @SerializedName("fr")
+    protected FailureReason failureReason;
+    @Getter
+    @Setter
+    protected long latestAutoResumeTimestamp;
+    @Getter
+    @Setter
+    protected long autoResumeCount;
+    @Getter
+    @SerializedName("props")
+    private final Map<String, String> properties;
+    private StreamingJobProperties jobProperties;
+    @Getter
+    @SerializedName("tvf")
+    private String tvfType;
+    private Map<String, String> originTvfProps;
+    @Getter
+    StreamingInsertTask runningStreamTask;
+    SourceOffsetProvider offsetProvider;
+    @Setter
+    @Getter
+    private long lastScheduleTaskTimestamp = -1L;
+    private InsertIntoTableCommand baseCommand;
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    public StreamingInsertJob(String jobName,
+            JobStatus jobStatus,
+            String dbName,
+            String comment,
+            UserIdentity createUser,
+            JobExecutionConfiguration jobConfig,
+            Long createTimeMs,
+            String executeSql,
+            Map<String, String> properties) {
+        super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+                jobConfig, createTimeMs, executeSql);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.properties = properties;
+        init();
+    }
+
+    private void init() {
+        try {
+            this.jobProperties = new StreamingJobProperties(properties);
+            jobProperties.validate();
+            // build time definition
+            JobExecutionConfiguration execConfig = getJobConfig();
+            TimerDefinition timerDefinition = new TimerDefinition();
+            timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
+            timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+            
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+            execConfig.setTimerDefinition(timerDefinition);
+
+            UnboundTVFRelation currentTvf = getCurrentTvf();
+            this.tvfType = currentTvf.getFunctionName();
+            this.originTvfProps = currentTvf.getProperties().getMap();
+            this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+        } catch (AnalysisException ae) {
+            log.warn("parse streaming insert job failed, props: {}", 
properties, ae);
+            throw new RuntimeException("parse streaming insert job failed, " + 
 ae.getMessage());
+        } catch (Exception ex) {
+            log.warn("init streaming insert job failed, sql: {}", 
getExecuteSql(), ex);
+            throw new RuntimeException("init streaming insert job failed, " +  
ex.getMessage());
+        }
+    }
+
+    private UnboundTVFRelation getCurrentTvf() {
+        if (baseCommand == null) {
+            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
+        }
+        List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
+        Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
+        UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+        return unboundTVFRelation;
+    }
+
+    @Override
+    public void updateJobStatus(JobStatus status) throws JobException {
+        lock.writeLock().lock();
+        try {
+            super.updateJobStatus(status);
+            if (isFinalStatus()) {
+                
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public JobType getJobType() {
+        return JobType.INSERT;
+    }
+
+    @Override
+    protected void checkJobParamsInternal() {
+    }
+
+    @Override
+    public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
+        return CollectionUtils.isEmpty(getRunningTasks());
+    }
+
+    @Override
+    public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, 
Map<Object, Object> taskContext) {
+        List<StreamingJobSchedulerTask> newTasks = new ArrayList<>();
+        StreamingJobSchedulerTask streamingJobSchedulerTask = new 
StreamingJobSchedulerTask(this);
+        newTasks.add(streamingJobSchedulerTask);
+        super.initTasks(newTasks, taskType);
+        return newTasks;
+    }
+
+    protected StreamingInsertTask createStreamingInsertTask() {
+        this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), getExecuteSql(),
+                offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
+        this.runningStreamTask.setStatus(TaskStatus.PENDING);
+        return runningStreamTask;
+    }
+
+    protected void fetchMeta() {
+        try {
+            if (originTvfProps == null) {
+                this.originTvfProps = getCurrentTvf().getProperties().getMap();
+            }
+            offsetProvider.fetchRemoteMeta(originTvfProps);
+        } catch (Exception ex) {
+            log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+        }
+    }
+
+    public boolean needScheduleTask() {
+        return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
+    }
+
+    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
+    public boolean needDelayScheduleTask() {
+        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
+    }
+
+    public boolean hasMoreDataToConsume() {
+        return offsetProvider.hasMoreDataToConsume();
+    }
+
+    @Override
+    public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
+        if (task.getErrMsg() != null) {
+            this.failureReason = new FailureReason(task.getErrMsg());
+        }
+        // Here is the failure of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
+    }
+
+    @Override
+    public void onTaskSuccess(StreamingJobSchedulerTask task) throws 
JobException {
+        // Here is the success of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
+    }
+
+    public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
+        try {
+            failedTaskCount.incrementAndGet();
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+                this.failureReason = new FailureReason(task.getErrMsg());
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+        updateJobStatus(JobStatus.PAUSED);
+    }
+
+    public void onStreamTaskSuccess(StreamingInsertTask task) {
+        try {
+            succeedTaskCount.incrementAndGet();
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            StreamingInsertTask nextTask = createStreamingInsertTask();
+            this.runningStreamTask = nextTask;
+            //todo: maybe fetch from txn attachment?
+            offsetProvider.updateOffset(task.getRunningOffset());
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
+        if (this.jobStatistic == null) {
+            this.jobStatistic = new StreamingJobStatistic();
+        }
+        this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + 
attachment.getScannedRows());
+        this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
+        this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getFileNumber());
+        this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileSize());
+        
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+    }
+
+    @Override
+    public void onRegister() throws JobException {
+        
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+    }
+
+    @Override
+    public void onReplayCreate() throws JobException {
+        onRegister();
+        super.onReplayCreate();
+    }
+
+    @Override
+    public ShowResultSetMetaData getTaskMetaData() {
+        return InsertJob.TASK_META_DATA;
+    }
+
+    @Override
+    public List<String> getShowInfo() {
+        return getCommonShowInfo();
+    }
+
+    @Override
+    public TRow getTvfInfo() {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+        trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+        trow.addToColumnValue(new TCell().setStringVal(getComment()));
+        trow.addToColumnValue(new TCell().setStringVal(properties != null
+                ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
+
+        if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getSyncOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        if (offsetProvider != null && offsetProvider.getRemoteOffset() != 
null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        trow.addToColumnValue(new TCell().setStringVal(
+                jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(failureReason == null
+                ? FeConstants.null_string : failureReason.getMsg()));
+        return trow;
+    }
+
+    @Override
+    public String formatMsgWhenExecuteQueueFull(Long taskId) {
+        return commonFormatMsgWhenExecuteQueueFull(taskId, 
"streaming_task_queue_size",
+                "job_streaming_task_consumer_thread_num");
+    }
+
+    @Override
+    public List<StreamingJobSchedulerTask> queryTasks() {
+        if (!getRunningTasks().isEmpty()) {
+            return getRunningTasks();
+        } else {
+            return Arrays.asList(new StreamingJobSchedulerTask(this));
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    @Override
+    public long getId() {
+        return getJobId();
+    }
+
+    @Override
+    public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
+        boolean shouldRealseLock = false;

Review Comment:
   Variable name contains a typo: 'shouldRealseLock' should be 
'shouldReleaseLock'.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.FailureReason;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.common.JobType;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.persist.gson.GsonPostProcessable;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.transaction.TransactionException;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TxnStateChangeCallback;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.log4j.Log4j2;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+@Log4j2
+public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
+        TxnStateChangeCallback, GsonPostProcessable {
+    private final long dbId;
+    private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
+    @Getter
+    @SerializedName("fr")
+    protected FailureReason failureReason;
+    @Getter
+    @Setter
+    protected long latestAutoResumeTimestamp;
+    @Getter
+    @Setter
+    protected long autoResumeCount;
+    @Getter
+    @SerializedName("props")
+    private final Map<String, String> properties;
+    private StreamingJobProperties jobProperties;
+    @Getter
+    @SerializedName("tvf")
+    private String tvfType;
+    private Map<String, String> originTvfProps;
+    @Getter
+    StreamingInsertTask runningStreamTask;
+    SourceOffsetProvider offsetProvider;
+    @Setter
+    @Getter
+    private long lastScheduleTaskTimestamp = -1L;
+    private InsertIntoTableCommand baseCommand;
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    public StreamingInsertJob(String jobName,
+            JobStatus jobStatus,
+            String dbName,
+            String comment,
+            UserIdentity createUser,
+            JobExecutionConfiguration jobConfig,
+            Long createTimeMs,
+            String executeSql,
+            Map<String, String> properties) {
+        super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+                jobConfig, createTimeMs, executeSql);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.properties = properties;
+        init();
+    }
+
+    private void init() {
+        try {
+            this.jobProperties = new StreamingJobProperties(properties);
+            jobProperties.validate();
+            // build time definition
+            JobExecutionConfiguration execConfig = getJobConfig();
+            TimerDefinition timerDefinition = new TimerDefinition();
+            timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
+            timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+            
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+            execConfig.setTimerDefinition(timerDefinition);
+
+            UnboundTVFRelation currentTvf = getCurrentTvf();
+            this.tvfType = currentTvf.getFunctionName();
+            this.originTvfProps = currentTvf.getProperties().getMap();
+            this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+        } catch (AnalysisException ae) {
+            log.warn("parse streaming insert job failed, props: {}", 
properties, ae);
+            throw new RuntimeException("parse streaming insert job failed, " + 
 ae.getMessage());
+        } catch (Exception ex) {
+            log.warn("init streaming insert job failed, sql: {}", 
getExecuteSql(), ex);
+            throw new RuntimeException("init streaming insert job failed, " +  
ex.getMessage());
+        }
+    }
+
+    private UnboundTVFRelation getCurrentTvf() {
+        if (baseCommand == null) {
+            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
+        }
+        List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
+        Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
+        UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+        return unboundTVFRelation;
+    }
+
+    @Override
+    public void updateJobStatus(JobStatus status) throws JobException {
+        lock.writeLock().lock();
+        try {
+            super.updateJobStatus(status);
+            if (isFinalStatus()) {
+                
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public JobType getJobType() {
+        return JobType.INSERT;
+    }
+
+    @Override
+    protected void checkJobParamsInternal() {
+    }
+
+    @Override
+    public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
+        return CollectionUtils.isEmpty(getRunningTasks());
+    }
+
+    @Override
+    public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, 
Map<Object, Object> taskContext) {
+        List<StreamingJobSchedulerTask> newTasks = new ArrayList<>();
+        StreamingJobSchedulerTask streamingJobSchedulerTask = new 
StreamingJobSchedulerTask(this);
+        newTasks.add(streamingJobSchedulerTask);
+        super.initTasks(newTasks, taskType);
+        return newTasks;
+    }
+
+    protected StreamingInsertTask createStreamingInsertTask() {
+        this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), getExecuteSql(),
+                offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
+        
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
+        this.runningStreamTask.setStatus(TaskStatus.PENDING);
+        return runningStreamTask;
+    }
+
+    protected void fetchMeta() {
+        try {
+            if (originTvfProps == null) {
+                this.originTvfProps = getCurrentTvf().getProperties().getMap();
+            }
+            offsetProvider.fetchRemoteMeta(originTvfProps);
+        } catch (Exception ex) {
+            log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+        }
+    }
+
+    public boolean needScheduleTask() {
+        return (getJobStatus().equals(JobStatus.RUNNING) || 
getJobStatus().equals(JobStatus.PENDING));
+    }
+
+    // When consumer to EOF, delay schedule task appropriately can avoid too 
many small transactions.
+    public boolean needDelayScheduleTask() {
+        return System.currentTimeMillis() - lastScheduleTaskTimestamp > 
jobProperties.getMaxIntervalSecond() * 1000;
+    }
+
+    public boolean hasMoreDataToConsume() {
+        return offsetProvider.hasMoreDataToConsume();
+    }
+
+    @Override
+    public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
+        if (task.getErrMsg() != null) {
+            this.failureReason = new FailureReason(task.getErrMsg());
+        }
+        // Here is the failure of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
+    }
+
+    @Override
+    public void onTaskSuccess(StreamingJobSchedulerTask task) throws 
JobException {
+        // Here is the success of StreamingJobSchedulerTask, no processing is 
required
+        getRunningTasks().remove(task);
+    }
+
+    public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
+        try {
+            failedTaskCount.incrementAndGet();
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+                this.failureReason = new FailureReason(task.getErrMsg());
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+        updateJobStatus(JobStatus.PAUSED);
+    }
+
+    public void onStreamTaskSuccess(StreamingInsertTask task) {
+        try {
+            succeedTaskCount.incrementAndGet();
+            
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            StreamingInsertTask nextTask = createStreamingInsertTask();
+            this.runningStreamTask = nextTask;
+            //todo: maybe fetch from txn attachment?
+            offsetProvider.updateOffset(task.getRunningOffset());
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
+        if (this.jobStatistic == null) {
+            this.jobStatistic = new StreamingJobStatistic();
+        }
+        this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + 
attachment.getScannedRows());
+        this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
+        this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getFileNumber());
+        this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileSize());
+        
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+    }
+
+    @Override
+    public void onRegister() throws JobException {
+        
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+    }
+
+    @Override
+    public void onReplayCreate() throws JobException {
+        onRegister();
+        super.onReplayCreate();
+    }
+
+    @Override
+    public ShowResultSetMetaData getTaskMetaData() {
+        return InsertJob.TASK_META_DATA;
+    }
+
+    @Override
+    public List<String> getShowInfo() {
+        return getCommonShowInfo();
+    }
+
+    @Override
+    public TRow getTvfInfo() {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+        trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+        trow.addToColumnValue(new TCell().setStringVal(getComment()));
+        trow.addToColumnValue(new TCell().setStringVal(properties != null
+                ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
+
+        if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getSyncOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        if (offsetProvider != null && offsetProvider.getRemoteOffset() != 
null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        trow.addToColumnValue(new TCell().setStringVal(
+                jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(failureReason == null
+                ? FeConstants.null_string : failureReason.getMsg()));
+        return trow;
+    }
+
+    @Override
+    public String formatMsgWhenExecuteQueueFull(Long taskId) {
+        return commonFormatMsgWhenExecuteQueueFull(taskId, 
"streaming_task_queue_size",
+                "job_streaming_task_consumer_thread_num");
+    }
+
+    @Override
+    public List<StreamingJobSchedulerTask> queryTasks() {
+        if (!getRunningTasks().isEmpty()) {
+            return getRunningTasks();
+        } else {
+            return Arrays.asList(new StreamingJobSchedulerTask(this));
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    @Override
+    public long getId() {
+        return getJobId();
+    }
+
+    @Override
+    public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
+        boolean shouldRealseLock = false;
+        lock.writeLock().lock();
+        try {
+            ArrayList<Long> taskIds = new ArrayList<>();
+            taskIds.add(runningStreamTask.getTaskId());
+            List<LoadJob> loadJobs = 
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
+            if (loadJobs.size() != 1) {
+                shouldRealseLock = true;
+                throw new TransactionException("load job not found, insert job 
id is " + runningStreamTask.getTaskId());
+            }
+            LoadJob loadJob = loadJobs.get(0);
+
+            if (txnState.getTransactionId() != loadJob.getTransactionId()
+                    || 
!runningStreamTask.getStatus().equals(TaskStatus.RUNNING)) {
+                shouldRealseLock = true;
+                throw new TransactionException("txn " + 
txnState.getTransactionId() + "should be aborted.");
+            }
+
+            LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+            txnState.setTxnCommitAttachment(new 
StreamingTaskTxnCommitAttachment(
+                        getJobId(),
+                        runningStreamTask.getTaskId(),
+                        loadStatistic.getScannedRows(),
+                        loadStatistic.getLoadBytes(),
+                        loadStatistic.getFileNumber(),
+                        loadStatistic.getTotalFileSizeB(),
+                        runningStreamTask.getRunningOffset().toJson()));
+        } finally {
+            if (shouldRealseLock) {

Review Comment:
   Variable name contains a typo: 'shouldRealseLock' should be 
'shouldReleaseLock'.



##########
fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java:
##########
@@ -630,6 +631,155 @@ public Status globList(String remotePath, 
List<RemoteFile> result, boolean fileN
         }
     }
 
+
+    /**
+     * List all files under the given path with glob pattern.
+     * For example, if the path is "s3://bucket/path/to/*.csv",
+     * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+     * <p>
+     * Limit: Starting from startFile, until the total file size is greater 
than fileSizeLimit,
+     * or the number of files is greater than fileNumLimit.
+     *
+     * @return The largest file name after listObject this time
+     */
+    public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
+            long fileSizeLimit, long fileNumLimit) {
+        long roundCnt = 0;
+        long elementCnt = 0;
+        long matchCnt = 0;
+        long matchFileSize = 0L;
+        long startTime = System.nanoTime();
+        try {
+            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
+            if (uri.useS3DirectoryBucket()) {
+                throw new RuntimeException("Not support glob with limit for 
directory bucket");
+            }
+
+            String bucket = uri.getBucket();
+            String globPath = uri.getKey(); // eg: path/to/*.csv
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList globPath:{}, remotePath:{}", globPath, 
remotePath);
+            }
+            java.nio.file.Path pathPattern = Paths.get(globPath);
+            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
+            HashSet<String> directorySet = new HashSet<>();
+
+            String listPrefix = S3Util.getLongestPrefix(globPath); // similar 
to Azure
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", 
listPrefix, globPath);
+            }
+
+            Builder builder = ListObjectsV2Request.builder();
+            builder.bucket(bucket)
+                    .prefix(listPrefix);
+
+            if (startFile != null) {
+                builder.startAfter(startFile);
+            }
+
+            ListObjectsV2Request request = builder.build();
+
+            String currentMaxFile = "";
+            boolean isTruncated = false;
+            boolean reachLimit = false;
+            do {
+                roundCnt++;
+                ListObjectsV2Response response = listObjectsV2(request);
+                for (S3Object obj : response.contents()) {
+                    elementCnt++;
+                    java.nio.file.Path objPath = Paths.get(obj.key());
+
+                    boolean isPrefix = false;
+                    while (objPath != null && 
objPath.normalize().toString().startsWith(listPrefix)) {
+                        if (!matcher.matches(objPath)) {
+                            isPrefix = true;
+                            objPath = objPath.getParent();
+                            continue;
+                        }
+                        if 
(directorySet.contains(objPath.normalize().toString())) {
+                            break;
+                        }
+                        if (isPrefix) {
+                            directorySet.add(objPath.normalize().toString());
+                        }
+
+                        matchCnt++;
+                        RemoteFile remoteFile = new 
RemoteFile(objPath.getFileName().toString(),
+                                !isPrefix,
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
+                        );
+                        remoteFile.setBucket(bucket);
+                        
remoteFile.setParentPath(objPath.getParent().toString());
+                        matchFileSize += obj.size();
+                        result.add(remoteFile);
+
+                        if (reachLimit(result.size(), matchFileSize, 
fileSizeLimit, fileNumLimit)) {
+                            reachLimit = true;
+                            break;
+                        }
+
+                        objPath = objPath.getParent();
+                        isPrefix = true;
+                    }
+                    if (reachLimit) {
+                        break;
+                    }
+                }
+                //record current last object file name
+                S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
+                currentMaxFile = lastS3Object.key();

Review Comment:
   Potential IndexOutOfBoundsException if response.contents() is empty. Should 
check if the list is not empty before accessing the last element.
   ```suggestion
                   if (!response.contents().isEmpty()) {
                       S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
                       currentMaxFile = lastS3Object.key();
                   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.job.base.Job;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertTask;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TStatusCode;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.log4j.Log4j2;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Log4j2
+@Getter
+public class StreamingInsertTask {
+    private static final String LABEL_SPLITTER = "_";
+    private static final int MAX_RETRY = 3;
+    private long jobId;
+    private long taskId;
+    private String labelName;
+    @Setter
+    private TaskStatus status;
+    private String errMsg;
+    @Setter
+    private String otherMsg;
+    private Long createTimeMs;
+    private Long startTimeMs;
+    private Long finishTimeMs;
+    private String sql;
+    private StmtExecutor stmtExecutor;
+    private InsertIntoTableCommand taskCommand;
+    private String currentDb;
+    private UserIdentity userIdentity;
+    private ConnectContext ctx;
+    private Offset runningOffset;
+    private AtomicBoolean isCanceled = new AtomicBoolean(false);
+    private StreamingJobProperties jobProperties;
+    private Map<String, String> originTvfProps;
+    SourceOffsetProvider offsetProvider;
+
+    public StreamingInsertTask(long jobId,
+                               long taskId,
+                               String sql,
+                               SourceOffsetProvider offsetProvider,
+                               String currentDb,
+                               StreamingJobProperties jobProperties,
+                               Map<String, String> originTvfProps,
+                               UserIdentity userIdentity) {
+        this.jobId = jobId;
+        this.taskId = taskId;
+        this.sql = sql;
+        this.userIdentity = userIdentity;
+        this.currentDb = currentDb;
+        this.offsetProvider = offsetProvider;
+        this.jobProperties = jobProperties;
+        this.originTvfProps = originTvfProps;
+        this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
+        this.createTimeMs = System.currentTimeMillis();
+    }
+
+    public void execute() throws JobException {
+        try {
+            before();
+            run();
+            onSuccess();
+        } catch (Exception e) {
+            if (TaskStatus.CANCELED.equals(status)) {
+                return;
+            }
+            log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
+            onFail(e.getMessage());
+        } finally {
+            // The cancel logic will call the closeOrReleased Resources method 
by itself.
+            // If it is also called here,
+            // it may result in the inability to obtain relevant information 
when canceling the task
+            if (!TaskStatus.CANCELED.equals(status)) {
+                closeOrReleaseResources();
+            }
+        }
+    }
+
+    private void before() throws Exception {
+        this.status = TaskStatus.RUNNING;
+        this.startTimeMs = System.currentTimeMillis();
+
+        if (isCanceled.get()) {
+            throw new JobException("Export executor has been canceled, task 
id: {}", getTaskId());

Review Comment:
   Error message mentions 'Export executor' but this is actually a streaming 
insert task. The message should reference 'Streaming insert task' instead.
   ```suggestion
               throw new JobException("Streaming insert task has been canceled, 
task id: {}", getTaskId());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to