This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new a69c8fdcc2a [Feature](WIP) Add Create StreamingJob and Alter Job 
(#55814)
a69c8fdcc2a is described below

commit a69c8fdcc2aad853ab3313a1e2fad50b95321e69
Author: wudi <[email protected]>
AuthorDate: Tue Sep 9 17:16:09 2025 +0800

    [Feature](WIP) Add Create StreamingJob and Alter Job (#55814)
    
    ### What problem does this PR solve?
    
    1. Add Create StreamingJob and Alter Job
    2. add job and task tvf schema
    3. add offset api
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  13 +-
 .../org/apache/doris/job/base/JobProperties.java   |  25 ++++
 .../doris/job/extensions/insert/InsertJob.java     |  31 +++++
 .../doris/job/extensions/insert/InsertTask.java    |   5 +-
 .../insert/streaming/StreamingInsertJob.java       |  25 ++++
 .../insert/streaming/StreamingJobProperties.java   |  64 +++++++++
 .../org/apache/doris/job/manager/JobManager.java   |  23 ++++
 .../java/org/apache/doris/job/offset/Offset.java   |  22 +++
 .../doris/job/offset/SourceOffsetProvider.java     |  62 +++++++++
 .../job/offset/SourceOffsetProviderFactory.java    |  37 +++++
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  36 +++++
 .../job/offset/s3/S3SourceOffsetProvider.java      |  53 ++++++++
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  14 +-
 .../apache/doris/nereids/trees/plans/PlanType.java |   1 +
 .../trees/plans/commands/AlterJobCommand.java      | 151 +++++++++++++++++++++
 .../trees/plans/commands/info/CreateJobInfo.java   |  64 ++++++++-
 .../trees/plans/visitor/CommandVisitor.java        |   5 +
 17 files changed, 619 insertions(+), 12 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d8f5f810eb0..a0ae7c6916e 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -99,16 +99,19 @@ materializedViewStatement
     | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier                 
                 #showCreateMTMV
     ;
 supportedJobStatement
-    : CREATE JOB label=multipartIdentifier ON SCHEDULE
-        (
+    : CREATE JOB label=multipartIdentifier propertyClause?
+      ON (STREAMING | SCHEDULE(
             (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
             (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
             (ENDS endsTime=STRING_LITERAL)?)
             |
-            (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
-        commentSpec?
-        DO supportedDmlStatement                                               
                                                              
#createScheduledJob
+            (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))
+            )
+         )
+       commentSpec?
+       DO supportedDmlStatement                                                
                                                             #createScheduledJob
    | PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)  
                                                              #pauseJob
+   | ALTER JOB FOR (jobNameKey=identifier) (propertyClause | 
supportedDmlStatement | propertyClause  supportedDmlStatement)                  
#alterJob
    | DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL)                                                   
 #dropJob
    | RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) 
                                                              #resumeJob
    | CANCEL TASK WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ 
(taskIdValue=INTEGER_VALUE)    #cancelJobTask
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
new file mode 100644
index 00000000000..3985b59bf16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.base;
+
+import org.apache.doris.common.AnalysisException;
+
+public interface JobProperties {
+    default void validate() throws AnalysisException {
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index dd0e0d3a228..5b386886b19 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -51,6 +51,8 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
@@ -96,6 +98,11 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("CreateTime", ScalarType.createStringType()))
             .addAll(COMMON_SCHEMA)
             .add(new Column("Comment", ScalarType.createStringType()))
+            // only execute type = streaming need record
+            .add(new Column("Progress", ScalarType.createStringType()))
+            .add(new Column("RemoteOffset", ScalarType.createStringType()))
+            .add(new Column("LoadStatistic", ScalarType.createStringType()))
+            .add(new Column("ErrorMsg", ScalarType.createStringType()))
             .build();
 
     private static final ShowResultSetMetaData TASK_META_DATA =
@@ -112,6 +119,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
                     .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("User", 
ScalarType.createVarchar(50)))
+                    // only execute type = streaming need record
+                    .addColumn(new Column("Offset", 
ScalarType.createStringType()))
                     .build();
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -523,6 +532,28 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
         }
     }
 
+    @Override
+    public TRow getTvfInfo() {
+        TRow trow = new TRow();
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getJobId())));
+        trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+        trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+        trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+        trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+        trow.addToColumnValue(new TCell().setStringVal(getComment()));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        trow.addToColumnValue(new 
TCell().setStringVal(loadStatistic.toJson()));
+        trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
+        return trow;
+    }
+
     @Override
     public String formatMsgWhenExecuteQueueFull(Long taskId) {
         return commonFormatMsgWhenExecuteQueueFull(taskId, 
"insert_task_queue_size",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index f6633d6c5e8..34cfdf6edea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -67,7 +67,8 @@ public class InsertTask extends AbstractTask {
             new Column("FinishTime", ScalarType.createStringType()),
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
-            new Column("User", ScalarType.createStringType()));
+            new Column("User", ScalarType.createStringType()),
+            new Column("Offset", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -272,6 +273,7 @@ public class InsertTask extends AbstractTask {
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         }
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
@@ -292,6 +294,7 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 9e5baccd134..a2016c52cb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -17,13 +17,16 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.PauseReason;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
@@ -37,6 +40,8 @@ import java.util.Map;
 
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> {
 
+    @SerializedName("did")
+    private final long dbId;
     @Getter
     @SerializedName("st")
     protected JobStatus status;
@@ -52,6 +57,26 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Setter
     protected long autoResumeCount;
 
+    @Getter
+    @SerializedName("jp")
+    private StreamingJobProperties jobProperties;
+
+    public StreamingInsertJob(String jobName,
+            JobStatus jobStatus,
+            String dbName,
+            String comment,
+            UserIdentity createUser,
+            JobExecutionConfiguration jobConfig,
+            Long createTimeMs,
+            String executeSql,
+            StreamingJobProperties jobProperties) {
+        super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+                jobConfig, createTimeMs, executeSql);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.jobProperties = jobProperties;
+    }
+
+
     @Override
     public void updateJobStatus(JobStatus status) throws JobException {
         super.updateJobStatus(status);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
new file mode 100644
index 00000000000..25d256b127b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.job.base.JobProperties;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class StreamingJobProperties implements JobProperties {
+    public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
+    public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
+    public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+    public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
+    public static final long DEFAULT_S3_BATCH_FILES = 256;
+    public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L; 
// 10GB
+    public static final long DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+
+    private final Map<String, String> properties;
+    private long maxIntervalSecond;
+    private long s3BatchFiles;
+    private long s3BatchSize;
+
+    public StreamingJobProperties(Map<String, String> jobProperties) {
+        this.properties = jobProperties;
+    }
+
+    @Override
+    public void validate() throws AnalysisException {
+        this.maxIntervalSecond = Util.getLongPropertyOrDefault(
+                
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
+                StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, (v) -> v 
>= 1,
+                StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY + " should 
> 1");
+
+        this.s3BatchFiles = Util.getLongPropertyOrDefault(
+                properties.get(StreamingJobProperties.S3_BATCH_FILES_PROPERTY),
+                StreamingJobProperties.DEFAULT_S3_BATCH_FILES, (v) -> v >= 1,
+                StreamingJobProperties.S3_BATCH_FILES_PROPERTY + " should >=1 
");
+
+        this.s3BatchSize = 
Util.getLongPropertyOrDefault(properties.get(StreamingJobProperties.S3_BATCH_SIZE_PROPERTY),
+                StreamingJobProperties.DEFAULT_S3_BATCH_SIZE, (v) -> v >= 100 
* 1024 * 1024
+                        && v <= (long) (1024 * 1024 * 1024) * 10,
+                StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should 
between 100MB and 10GB");
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 3d705b945e7..e763f8de590 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
@@ -219,6 +220,17 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         jobMap.get(jobId).logUpdateOperation();
     }
 
+    public void alterJob(T job) {
+        writeLock();
+        try {
+            jobMap.put(job.getJobId(), job);
+            job.logUpdateOperation();
+        } finally {
+            writeUnlock();
+        }
+        log.info("update job success, jobId: {}", job.getJobId());
+    }
+
     public void alterJobStatus(String jobName, JobStatus jobStatus) throws 
JobException {
         for (T a : jobMap.values()) {
             if (a.getJobName().equals(jobName)) {
@@ -349,6 +361,9 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
      */
     public void cancelTaskById(String jobName, Long taskId) throws 
JobException {
         for (T job : jobMap.values()) {
+            if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+                throw new JobException("streaming job not support cancel task 
by id");
+            }
             if (job.getJobName().equals(jobName)) {
                 job.cancelTaskById(taskId);
                 job.logUpdateOperation();
@@ -392,6 +407,14 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         return jobMap.get(jobId);
     }
 
+    public T getJobByName(String jobName) throws JobException {
+        for (T a : jobMap.values()) {
+            if (a.getJobName().equals(jobName)) {
+                return a;
+            }
+        }
+        throw new JobException("job not exist, jobName:" + jobName);
+    }
 
     /**
      * get load info by db
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
new file mode 100644
index 00000000000..095f0a5e6bf
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+public interface Offset {
+    String toJson();
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
new file mode 100644
index 00000000000..f88079617de
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+/**
+ * Interface for managing offsets and metadata of a data source.
+ */
+public interface SourceOffsetProvider {
+    /**
+     * Get source type, e.g. s3, kafka
+     * @return
+     */
+    String getSourceType();
+
+    /**
+     * Get next offset to consume
+     * @return
+     */
+    Offset getNextOffset();
+
+    /**
+     * Rewrite the TVF parameters in the InsertIntoTableCommand based on the 
current offset.
+     * @param command
+     * @return
+     */
+    InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand 
command);
+
+    /**
+     * Update the progress of the source.
+     * @param offset
+     */
+    void updateProgress(Offset offset);
+
+    /**
+     * Fetch remote meta information, such as listing files in S3 or getting 
latest offsets in Kafka.
+     */
+    void fetchRemoteMeta();
+
+    /**
+     * Whether there is more data to consume
+     * @return
+     */
+    boolean hasMoreData();
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
new file mode 100644
index 00000000000..5ba1d903d78
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset;
+
+import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SourceOffsetProviderFactory {
+    private static final Map<String, Class<? extends SourceOffsetProvider>> 
map = new ConcurrentHashMap<>();
+
+    static {
+        map.put("s3", S3SourceOffsetProvider.class);
+    }
+
+    public static SourceOffsetProvider createSourceOffsetProvider(String 
sourceType) throws InstantiationException,
+            IllegalAccessException {
+        Class<? extends SourceOffsetProvider> cla = 
map.get(sourceType.toUpperCase());
+        return cla.newInstance();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
new file mode 100644
index 00000000000..86ff467796a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.s3;
+
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import java.util.List;
+
+public class S3Offset implements Offset {
+    String startFile;
+
+    String endFile;
+
+    List<String> fileLists;
+
+    @Override
+    public String toJson() {
+        return GsonUtils.GSON.toJson(this);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
new file mode 100644
index 00000000000..087d9c2beb7
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.offset.s3;
+
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+
+public class S3SourceOffsetProvider implements SourceOffsetProvider {
+
+    @Override
+    public String getSourceType() {
+        return null;
+    }
+
+    @Override
+    public Offset getNextOffset() {
+        return null;
+    }
+
+    @Override
+    public InsertIntoTableCommand 
rewriteTvfParamsInCommand(InsertIntoTableCommand command) {
+        return null;
+    }
+
+    @Override
+    public void updateProgress(Offset offset) {
+    }
+
+    @Override
+    public void fetchRemoteMeta() {
+    }
+
+    @Override
+    public boolean hasMoreData() {
+        return false;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index abb468a19cc..80a0886017e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -608,6 +608,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterRoleCommand;
@@ -1144,11 +1145,13 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         Optional<Long> interval = ctx.timeInterval == null ? Optional.empty() :
                 Optional.of(Long.valueOf(ctx.timeInterval.getText()));
         Optional<String> intervalUnit = ctx.timeUnit == null ? 
Optional.empty() : Optional.of(ctx.timeUnit.getText());
+        Map<String, String> properties = ctx.propertyClause() != null
+                ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
         String comment =
                 visitCommentSpec(ctx.commentSpec());
         String executeSql = getOriginSql(ctx.supportedDmlStatement());
         CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, 
interval, intervalUnit, startTime,
-                endsTime, immediateStartOptional, comment, executeSql);
+                endsTime, immediateStartOptional, comment, executeSql, 
ctx.STREAMING() != null, properties);
         return new CreateJobCommand(createJobInfo);
     }
 
@@ -1158,6 +1161,15 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         }
     }
 
+    @Override
+    public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
+        checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
+        Map<String, String> properties = ctx.propertyClause() != null
+                ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
+        String executeSql = getOriginSql(ctx.supportedDmlStatement());
+        return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()), 
properties, executeSql);
+    }
+
     @Override
     public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) {
         checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index e150899f95c..367e6d641e0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -162,6 +162,7 @@ public enum PlanType {
     CREATE_MTMV_COMMAND,
     CREATE_MATERIALIZED_VIEW_COMMAND,
     CREATE_JOB_COMMAND,
+    ALTER_JOB_COMMAND,
     PAUSE_JOB_COMMAND,
     CANCEL_JOB_COMMAND,
     DROP_CATALOG_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
new file mode 100644
index 00000000000..ef8c9bb8b7a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+
+/**
+ * alter job command.
+ */
+public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
+    // exclude job name prefix, which is used by inner job
+    private static final String excludeJobNamePrefix = "inner_";
+    private String jobName;
+    private Map<String, String> properties;
+    private String sql;
+
+    public AlterJobCommand(String jobName, Map<String, String> properties, 
String sql) {
+        super(PlanType.ALTER_JOB_COMMAND);
+        this.jobName = jobName;
+        this.properties = properties;
+        this.sql = sql;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.ALTER;
+    }
+
+    @Override
+    public void doRun(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+
+        validate();
+        AbstractJob job = analyzeAndBuildJobInfo(ctx);
+        ctx.getEnv().getJobManager().alterJob(job);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitAlterJobCommand(this, context);
+    }
+
+    private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws 
JobException {
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (job instanceof StreamingInsertJob) {
+            StreamingInsertJob originJob = (StreamingInsertJob) job;
+            String updateSQL = StringUtils.isEmpty(sql) ? 
originJob.getExecuteSql() : sql;
+            Map<String, String> updateProps = properties == null || 
properties.isEmpty() ? originJob.getJobProperties()
+                    .getProperties() : properties;
+            StreamingJobProperties streamJobProps = new 
StreamingJobProperties(updateProps);
+            // rebuild time definition
+            JobExecutionConfiguration execConfig = originJob.getJobConfig();
+            TimerDefinition timerDefinition = new TimerDefinition();
+            timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond());
+            timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+            
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+            execConfig.setTimerDefinition(timerDefinition);
+            return new StreamingInsertJob(jobName,
+                    job.getJobStatus(),
+                    job.getCurrentDbName(),
+                    job.getComment(),
+                    ConnectContext.get().getCurrentUserIdentity(),
+                    execConfig,
+                    System.currentTimeMillis(),
+                    updateSQL,
+                    streamJobProps);
+        } else {
+            throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private void validate() throws Exception {
+        if (jobName.startsWith(excludeJobNamePrefix)) {
+            throw new AnalysisException("Can't alter inner job");
+        }
+        AbstractJob job = 
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
+        if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
+            throw new AnalysisException("Only PAUSED job can be altered");
+        }
+
+        if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
+                && job instanceof StreamingInsertJob) {
+            StreamingInsertJob streamingJob = (StreamingInsertJob) job;
+            boolean proCheck = 
checkProperties(streamingJob.getJobProperties().getProperties());
+            boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+            if (!proCheck && !sqlCheck) {
+                throw new AnalysisException("No properties or sql changed in 
ALTER JOB");
+            }
+        } else {
+            throw new AnalysisException("Unsupported job type for ALTER:" + 
job.getJobType());
+        }
+    }
+
+    private boolean checkProperties(Map<String, String> originProps) {
+        if (originProps.isEmpty()) {
+            return false;
+        }
+        if (!originProps.equals(properties)) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean checkSql(String sql) {
+        if (sql == null || sql.isEmpty()) {
+            return false;
+        }
+        if (!sql.equals(sql)) {
+            return true;
+        }
+        return false;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 6cef7ee89ec..0d52e23ece5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -27,10 +27,13 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.JobProperties;
 import org.apache.doris.job.base.TimerDefinition;
 import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
@@ -39,6 +42,7 @@ import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Strings;
 
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -66,6 +70,8 @@ public class CreateJobInfo {
     private final String comment;
 
     private final String executeSql;
+    private final boolean streamingJob;
+    private final Map<String, String> jobProperties;
 
     /**
      * Constructor for CreateJobInfo.
@@ -83,7 +89,8 @@ public class CreateJobInfo {
     public CreateJobInfo(Optional<String> labelNameOptional, Optional<String> 
onceJobStartTimestampOptional,
                          Optional<Long> intervalOptional, Optional<String> 
intervalTimeUnitOptional,
                          Optional<String> startsTimeStampOptional, 
Optional<String> endsTimeStampOptional,
-                         Optional<Boolean> immediateStartOptional, String 
comment, String executeSql) {
+                         Optional<Boolean> immediateStartOptional, String 
comment, String executeSql,
+                         boolean streamingJob, Map<String, String> 
jobProperties) {
         this.labelNameOptional = labelNameOptional;
         this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
         this.intervalOptional = intervalOptional;
@@ -93,7 +100,8 @@ public class CreateJobInfo {
         this.immediateStartOptional = immediateStartOptional;
         this.comment = comment;
         this.executeSql = executeSql;
-
+        this.streamingJob = streamingJob;
+        this.jobProperties = jobProperties;
     }
 
     /**
@@ -117,16 +125,32 @@ public class CreateJobInfo {
         // check its insert stmt,currently only support insert stmt
         JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
         JobExecuteType executeType = intervalOptional.isPresent() ? 
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+        JobProperties properties = null;
+        if (streamingJob) {
+            executeType = JobExecuteType.STREAMING;
+            properties = new StreamingJobProperties(jobProperties);
+        }
         jobExecutionConfiguration.setExecuteType(executeType);
-        TimerDefinition timerDefinition = new TimerDefinition();
 
+        TimerDefinition timerDefinition = new TimerDefinition();
         if (executeType.equals(JobExecuteType.ONE_TIME)) {
             buildOnceJob(timerDefinition, jobExecutionConfiguration);
+        } else if (executeType.equals(JobExecuteType.STREAMING)) {
+            buildStreamingJob(timerDefinition, properties);
         } else {
             buildRecurringJob(timerDefinition, jobExecutionConfiguration);
         }
         jobExecutionConfiguration.setTimerDefinition(timerDefinition);
-        return analyzeAndCreateJob(executeSql, dbName, 
jobExecutionConfiguration);
+        return analyzeAndCreateJob(executeSql, dbName, 
jobExecutionConfiguration, properties);
+    }
+
+    private void buildStreamingJob(TimerDefinition timerDefinition, 
JobProperties props)
+            throws AnalysisException {
+        StreamingJobProperties properties = (StreamingJobProperties) props;
+        timerDefinition.setInterval(properties.getMaxIntervalSecond());
+        timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+        timerDefinition.setStartTimeMs(System.currentTimeMillis());
+        properties.validate();
     }
 
     /**
@@ -210,7 +234,17 @@ public class CreateJobInfo {
      * @throws UserException if there is an error during SQL analysis or job 
creation
      */
     private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
-                                            JobExecutionConfiguration 
jobExecutionConfiguration) throws UserException {
+                                            JobExecutionConfiguration 
jobExecutionConfiguration,
+                                            JobProperties properties) throws 
UserException {
+        if 
(jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) {
+            return analyzeAndCreateStreamingInsertJob(sql, currentDbName, 
jobExecutionConfiguration, properties);
+        } else {
+            return analyzeAndCreateInsertJob(sql, currentDbName, 
jobExecutionConfiguration);
+        }
+    }
+
+    private AbstractJob analyzeAndCreateInsertJob(String sql, String 
currentDbName,
+            JobExecutionConfiguration jobExecutionConfiguration) throws 
UserException {
         NereidsParser parser = new NereidsParser();
         LogicalPlan logicalPlan = parser.parseSingle(sql);
         if (logicalPlan instanceof InsertIntoTableCommand) {
@@ -234,6 +268,26 @@ public class CreateJobInfo {
         }
     }
 
+    private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String 
currentDbName,
+            JobExecutionConfiguration jobExecutionConfiguration, JobProperties 
properties) throws UserException {
+        NereidsParser parser = new NereidsParser();
+        LogicalPlan logicalPlan = parser.parseSingle(sql);
+        if (logicalPlan instanceof InsertIntoTableCommand) {
+            return new StreamingInsertJob(labelNameOptional.get(),
+                    JobStatus.RUNNING,
+                    currentDbName,
+                    comment,
+                    ConnectContext.get().getCurrentUserIdentity(),
+                    jobExecutionConfiguration,
+                    System.currentTimeMillis(),
+                    sql,
+                    (StreamingJobProperties) properties);
+        } else {
+            throw new AnalysisException("Not support this sql : " + sql + " 
Command class is "
+                    + logicalPlan.getClass().getName() + ".");
+        }
+    }
+
     private void checkJobName(String jobName) throws AnalysisException {
         if (Strings.isNullOrEmpty(jobName)) {
             throw new AnalysisException("job name can not be null");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index a5f9a00be1d..45abfa0c250 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -37,6 +37,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.AlterCatalogRenameCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColocateGroupCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterColumnStatsCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.AlterDatabasePropertiesCommand;
+import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterResourceCommand;
@@ -387,6 +388,10 @@ public interface CommandVisitor<R, C> {
         return visitCommand(createJobCommand, context);
     }
 
+    default R visitAlterJobCommand(AlterJobCommand alterJobCommand, C context) 
{
+        return visitCommand(alterJobCommand, context);
+    }
+
     default R visitCreateFileCommand(CreateFileCommand createFileCommand, C 
context) {
         return visitCommand(createFileCommand, context);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to