This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 33902107d3c [Feature](WIP) Add create job case and fix job bug (#56119)
33902107d3c is described below

commit 33902107d3cd2ef387d6847f62f0ea3f2174d40c
Author: wudi <[email protected]>
AuthorDate: Wed Sep 17 09:47:00 2025 +0800

    [Feature](WIP) Add create job case and fix job bug (#56119)
    
    ### What problem does this PR solve?
    
     Add create job case and fix job bug
---
 .../property/storage/StorageProperties.java        |   6 +-
 .../main/java/org/apache/doris/fs/FileSystem.java  |   2 +-
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java |  16 ++--
 .../org/apache/doris/fs/remote/RemoteFile.java     |  18 +++++
 .../org/apache/doris/fs/remote/S3FileSystem.java   |   2 +-
 .../org/apache/doris/job/base/AbstractJob.java     |   6 +-
 .../org/apache/doris/job/base/JobProperties.java   |   3 -
 .../insert/streaming/StreamingInsertJob.java       |  55 +++++++++----
 .../insert/streaming/StreamingInsertTask.java      |  16 +++-
 .../insert/streaming/StreamingJobProperties.java   |   8 +-
 .../doris/job/offset/SourceOffsetProvider.java     |   2 +-
 .../org/apache/doris/job/offset/s3/S3Offset.java   |   2 +-
 .../job/offset/s3/S3SourceOffsetProvider.java      |  60 +++++++-------
 .../trees/plans/commands/AlterJobCommand.java      |  23 ++----
 .../trees/plans/commands/info/CreateJobInfo.java   |  24 +++---
 .../commands/insert/InsertIntoTableCommand.java    |   7 ++
 .../org/apache/doris/catalog/DropFunctionTest.java |  29 ++++++-
 .../streaming_job/test_streaming_insert_job.out    | Bin 0 -> 364 bytes
 .../streaming_job/test_streaming_insert_job.groovy |  86 +++++++++++++++++++++
 19 files changed, 266 insertions(+), 99 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 2ce87f9ffb9..422d20a7560 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -159,8 +159,6 @@ public abstract class StorageProperties extends 
ConnectionProperties {
 
     private static final List<Function<Map<String, String>, 
StorageProperties>> PROVIDERS =
             Arrays.asList(
-                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
-                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null,
                     props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
                             || isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
                             || OSSHdfsProperties.guessIsMe(props)) ? new 
OSSHdfsProperties(props) : null,
@@ -181,7 +179,9 @@ public abstract class StorageProperties extends 
ConnectionProperties {
                     props -> (isFsSupport(props, FS_BROKER_SUPPORT)
                             || BrokerProperties.guessIsMe(props)) ? new 
BrokerProperties(props) : null,
                     props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
-                            || LocalProperties.guessIsMe(props)) ? new 
LocalProperties(props) : null
+                            || LocalProperties.guessIsMe(props)) ? new 
LocalProperties(props) : null,
+                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
+                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null
             );
 
     protected StorageProperties(Type type, Map<String, String> origProps) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index cfbb3e560f3..b2c6957ce38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -124,7 +124,7 @@ public interface FileSystem {
      * @param fileNumLimit limit the total number of files to be listed.
      * @return
      */
-    default String globListWithLimit(String remotePath, List<String> result,
+    default String globListWithLimit(String remotePath, List<RemoteFile> 
result,
             String startFile, long fileSizeLimit, long fileNumLimit) {
         throw new UnsupportedOperationException("Unsupported operation glob 
list with limit on current file system.");
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 0a4c9159881..4c79df0acf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -642,7 +642,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
      *
      * @return The largest file name after listObject this time
      */
-    public String globListWithLimit(String remotePath, List<String> result, 
String startFile,
+    public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
             long fileSizeLimit, long fileNumLimit) {
         long roundCnt = 0;
         long elementCnt = 0;
@@ -704,9 +704,16 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                         }
 
                         matchCnt++;
+                        RemoteFile remoteFile = new 
RemoteFile(objPath.getFileName().toString(),
+                                !isPrefix,
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? -1 : obj.size(),
+                                isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
+                        );
+                        remoteFile.setBucket(bucket);
+                        
remoteFile.setParentPath(objPath.getParent().toString());
                         matchFileSize += obj.size();
-                        String remoteFileName = "s3://" + bucket + "/" + 
objPath;
-                        result.add(remoteFileName);
+                        result.add(remoteFile);
 
                         if (reachLimit(result.size(), matchFileSize, 
fileSizeLimit, fileNumLimit)) {
                             break;
@@ -718,8 +725,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                 }
                 //record current last object file name
                 S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
-                java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key());
-                currentMaxFile =  "s3://" + bucket + "/" + lastObjPath;
+                currentMaxFile = lastS3Object.key();
 
                 isTruncated = response.isTruncated();
                 if (isTruncated) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index 1f6f0225278..ac1f8add947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -36,6 +36,8 @@ public class RemoteFile {
     private long modificationTime;
     private Path path;
     BlockLocation[] blockLocations;
+    private String parentPath;
+    private String bucket;
 
     public RemoteFile(String name, boolean isFile, long size, long blockSize) {
         this(name, null, isFile, !isFile, size, blockSize, 0, null);
@@ -75,6 +77,22 @@ public class RemoteFile {
         this.path = path;
     }
 
+    public String getBucket() {
+        return bucket;
+    }
+
+    public void setBucket(String bucket) {
+        this.bucket = bucket;
+    }
+
+    public String getParentPath() {
+        return parentPath;
+    }
+
+    public void setParentPath(String parentPath) {
+        this.parentPath = parentPath;
+    }
+
     public boolean isFile() {
         return isFile;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 9c409a66a29..d4e5a23c20e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -69,7 +69,7 @@ public class S3FileSystem extends ObjFileSystem {
     }
 
     @Override
-    public String globListWithLimit(String remotePath, List<String> result, 
String startFile,
+    public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
             long fileSizeLimit, long fileNumLimit) {
         S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
         return objStorage.globListWithLimit(remotePath, result, startFile, 
fileSizeLimit, fileNumLimit);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 83b84348ff7..d08942460f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -99,13 +99,13 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
 
 
     @SerializedName(value = "stc")
-    private AtomicLong succeedTaskCount = new AtomicLong(0);
+    protected AtomicLong succeedTaskCount = new AtomicLong(0);
 
     @SerializedName(value = "ftc")
-    private AtomicLong failedTaskCount = new AtomicLong(0);
+    protected AtomicLong failedTaskCount = new AtomicLong(0);
 
     @SerializedName(value = "ctc")
-    private AtomicLong canceledTaskCount = new AtomicLong(0);
+    protected AtomicLong canceledTaskCount = new AtomicLong(0);
 
     public AbstractJob() {
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
index 3985b59bf16..0cb08747858 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobProperties.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.job.base;
 
-import org.apache.doris.common.AnalysisException;
 
 public interface JobProperties {
-    default void validate() throws AnalysisException {
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 24df73cbbac..387714e8805 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -21,14 +21,18 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.PauseReason;
@@ -75,8 +79,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         TxnStateChangeCallback, GsonPostProcessable {
     private final long dbId;
     private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
-    @SerializedName("fm")
-    private FailMsg failMsg;
     @Getter
     protected PauseReason pauseReason;
     @Getter
@@ -86,7 +88,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Setter
     protected long autoResumeCount;
     @Getter
-    @SerializedName("jp")
+    @SerializedName("props")
+    private final Map<String, String> properties;
     private StreamingJobProperties jobProperties;
     @Getter
     @SerializedName("tvf")
@@ -108,26 +111,40 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             JobExecutionConfiguration jobConfig,
             Long createTimeMs,
             String executeSql,
-            StreamingJobProperties jobProperties) {
+            Map<String, String> properties) {
         super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
                 jobConfig, createTimeMs, executeSql);
         this.dbId = ConnectContext.get().getCurrentDbId();
-        this.jobProperties = jobProperties;
+        this.properties = properties;
         init();
     }
 
     private void init() {
         try {
+            this.jobProperties = new StreamingJobProperties(properties);
+            jobProperties.validate();
+            // build time definition
+            JobExecutionConfiguration execConfig = getJobConfig();
+            TimerDefinition timerDefinition = new TimerDefinition();
+            timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
+            timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
+            
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
+            execConfig.setTimerDefinition(timerDefinition);
+
             UnboundTVFRelation currentTvf = getCurrentTvf();
+            this.tvfType = currentTvf.getFunctionName();
             this.originTvfProps = currentTvf.getProperties().getMap();
             this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+        } catch (AnalysisException ae) {
+            log.warn("parse streaming insert job failed, props: {}", 
properties, ae);
+            throw new RuntimeException("parse streaming insert job failed, " + 
 ae.getMessage());
         } catch (Exception ex) {
             log.warn("init streaming insert job failed, sql: {}", 
getExecuteSql(), ex);
-            throw new RuntimeException("init streaming insert job failed, sql: 
" + getExecuteSql(), ex);
+            throw new RuntimeException("init streaming insert job failed, " +  
ex.getMessage());
         }
     }
 
-    private UnboundTVFRelation getCurrentTvf() throws Exception {
+    private UnboundTVFRelation getCurrentTvf() {
         if (baseCommand == null) {
             this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
         }
@@ -170,7 +187,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     protected StreamingInsertTask createStreamingInsertTask() {
         this.runningStreamTask = new StreamingInsertTask(getJobId(), 
AbstractTask.getNextTaskId(), getExecuteSql(),
-                offsetProvider, getCurrentDbName(), jobProperties, 
getCreateUser());
+                offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
         this.runningStreamTask.setStatus(TaskStatus.PENDING);
         return runningStreamTask;
@@ -213,17 +230,21 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     public void onStreamTaskFail(StreamingInsertTask task) throws JobException 
{
+        failedTaskCount.incrementAndGet();
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
         if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
-            this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, 
task.getErrMsg());
+            this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, 
task.getErrMsg());
         }
         updateJobStatus(JobStatus.PAUSED);
     }
 
     public void onStreamTaskSuccess(StreamingInsertTask task) {
+        succeedTaskCount.incrementAndGet();
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
         StreamingInsertTask nextTask = createStreamingInsertTask();
         this.runningStreamTask = nextTask;
+        //todo: maybe fetch from txn attachment?
+        offsetProvider.updateOffset(task.getRunningOffset());
     }
 
     private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment 
attachment) {
@@ -262,7 +283,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new TCell().setStringVal(getJobName()));
         trow.addToColumnValue(new 
TCell().setStringVal(getCreateUser().getQualifiedUser()));
         trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().getExecuteType().name()));
-        trow.addToColumnValue(new 
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+        trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
         trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
@@ -276,16 +297,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
+
         if (offsetProvider != null && offsetProvider.getRemoteOffset() != 
null) {
             trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
 
-        trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
         trow.addToColumnValue(new TCell().setStringVal(
                 jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
-        trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
+        trow.addToColumnValue(new TCell().setStringVal(pauseReason == null ? 
FeConstants.null_string : pauseReason.getMsg()));
         return trow;
     }
 
@@ -336,7 +357,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void beforeAborted(TransactionState txnState) throws 
TransactionException {
-
     }
 
     @Override
@@ -387,7 +407,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @Override
     public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason)
             throws UserException {
-
     }
 
     @Override
@@ -402,14 +421,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void replayOnVisible(TransactionState txnState) {
-
-
     }
 
     @Override
     public void gsonPostProcess() throws IOException {
-        if (offsetProvider == null) {
+        if (offsetProvider == null && tvfType != null) {
             offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
         }
+
+        if (jobProperties == null && properties != null) {
+            jobProperties = new StreamingJobProperties(properties);
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 73ac267e678..b1e22a0a7cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -29,6 +29,7 @@ import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState;
@@ -39,6 +40,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
 
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -65,6 +67,7 @@ public class StreamingInsertTask {
     private Offset runningOffset;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
     private StreamingJobProperties jobProperties;
+    private Map<String, String> originTvfProps;
     SourceOffsetProvider offsetProvider;
 
     public StreamingInsertTask(long jobId,
@@ -73,6 +76,7 @@ public class StreamingInsertTask {
                                SourceOffsetProvider offsetProvider,
                                String currentDb,
                                StreamingJobProperties jobProperties,
+                               Map<String, String> originTvfProps,
                                UserIdentity userIdentity) {
         this.jobId = jobId;
         this.taskId = taskId;
@@ -81,6 +85,7 @@ public class StreamingInsertTask {
         this.currentDb = currentDb;
         this.offsetProvider = offsetProvider;
         this.jobProperties = jobProperties;
+        this.originTvfProps = originTvfProps;
         this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
         this.createTimeMs = System.currentTimeMillis();
     }
@@ -118,8 +123,15 @@ public class StreamingInsertTask {
         StatementContext statementContext = new StatementContext();
         ctx.setStatementContext(statementContext);
 
-        this.runningOffset = offsetProvider.getNextOffset(jobProperties, 
jobProperties.getProperties());
-        this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
+        this.runningOffset = offsetProvider.getNextOffset(jobProperties, 
originTvfProps);
+        InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(sql);
+        StmtExecutor baseStmtExecutor =
+                new StmtExecutor(ctx, new LogicalPlanAdapter(baseCommand, 
ctx.getStatementContext()));
+        baseCommand.initPlan(ctx, baseStmtExecutor, false);
+        if (!baseCommand.getParsedPlan().isPresent()) {
+            throw new JobException("Can not get Parsed plan");
+        }
+        this.taskCommand = offsetProvider.rewriteTvfParams(baseCommand, 
runningOffset);
         this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER 
+ getTaskId()));
         this.taskCommand.setJobId(getTaskId());
         this.stmtExecutor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 4f463a090d5..e71b169ef21 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -24,6 +24,7 @@ import org.apache.doris.job.exception.JobException;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.SessionVariable;
 
+import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 
 import java.util.HashMap;
@@ -43,15 +44,18 @@ public class StreamingJobProperties implements 
JobProperties {
 
     private final Map<String, String> properties;
     private long maxIntervalSecond;
-    private int  maxRetry;
     private long s3BatchFiles;
     private long s3BatchSize;
 
     public StreamingJobProperties(Map<String, String> jobProperties) {
         this.properties = jobProperties;
+        if (properties.isEmpty()) {
+            this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND;
+            this.s3BatchFiles = DEFAULT_S3_BATCH_FILES;
+            this.s3BatchSize = DEFAULT_S3_BATCH_SIZE;
+        }
     }
 
-    @Override
     public void validate() throws AnalysisException {
         this.maxIntervalSecond = Util.getLongPropertyOrDefault(
                 
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index d9b2264d5b6..e670dac553c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -62,7 +62,7 @@ public interface SourceOffsetProvider {
      * @param nextOffset
      * @return rewritten InsertIntoTableCommand
      */
-    InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset 
nextOffset) throws Exception;
+    InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset nextOffset);
 
     /**
      * Update the offset of the source.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index dd6927935bb..cbd8645cc59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -30,7 +30,7 @@ import java.util.List;
 public class S3Offset implements Offset {
     String startFile;
     String endFile;
-    List<String> fileLists;
+    String fileLists;
 
     @Override
     public void setEndOffset(String endOffset) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index a95f4af65c9..df4a344f064 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,20 +19,18 @@ package org.apache.doris.job.offset.s3;
 
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.trees.expressions.Properties;
-import org.apache.doris.nereids.trees.expressions.functions.table.S3;
 import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
-import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import lombok.extern.log4j.Log4j2;
 
 import java.util.ArrayList;
@@ -40,12 +38,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 @Log4j2
 public class S3SourceOffsetProvider implements SourceOffsetProvider {
     S3Offset currentOffset;
     String maxRemoteEndFile;
-    InsertIntoTableCommand baseCommand;
 
     @Override
     public String getSourceType() {
@@ -54,19 +52,29 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
 
     @Override
     public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, 
String> properties) {
+        Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        copiedProps.putAll(properties);
         S3Offset offset = new S3Offset();
-        List<String> rfiles = new ArrayList<>();
+        List<RemoteFile> rfiles = new ArrayList<>();
         String startFile = currentOffset == null ? null : 
currentOffset.endFile;
         String filePath = null;
-        StorageProperties storageProperties = 
StorageProperties.createPrimary(properties);
+        StorageProperties storageProperties = 
StorageProperties.createPrimary(copiedProps);
         try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
-            String uri = storageProperties.validateAndGetUri(properties);
+            String uri = storageProperties.validateAndGetUri(copiedProps);
             filePath = storageProperties.validateAndNormalizeUri(uri);
             maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, 
startFile,
                     jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
             offset.setStartFile(startFile);
-            offset.setEndFile(rfiles.get(rfiles.size() - 1));
-            offset.setFileLists(rfiles);
+            //todo: The path may be in the form of bucket/dir/*/*,
+            // but currently only the case where the last segment is * is 
handled.
+            if (!rfiles.isEmpty()) {
+                String bucket = rfiles.get(0).getBucket();
+                String parentPath = rfiles.get(0).getParentPath();
+                String filePaths = 
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{", 
"}"));
+                String finalFiles = String.format("s3://%s/%s/%s", bucket, 
parentPath, filePaths);
+                offset.setEndFile(String.format("s3://%s/%s/%s", bucket, 
parentPath, rfiles.get(rfiles.size() - 1).getName()));
+                offset.setFileLists(finalFiles);
+            }
         } catch (Exception e) {
             log.warn("list path exception, path={}", filePath, e);
             throw new RuntimeException(e);
@@ -93,23 +101,18 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset 
runningOffset) throws Exception {
+    public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand 
originCommand, Offset runningOffset) {
         S3Offset offset = (S3Offset) runningOffset;
         Map<String, String> props = new HashMap<>();
-        String finalUri = "{" + String.join(",", offset.getFileLists()) + "}";
-        props.put("uri", finalUri);
-        if (baseCommand == null) {
-            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(executeSql);
-            this.baseCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
-        }
-
         // rewrite plan
-        Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
-            if (plan instanceof LogicalTVFRelation) {
-                LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
-                LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
-                        originTvfRel.getRelationId(), new S3(new 
Properties(props)), ImmutableList.of());
-                return newRvfRel;
+        Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan 
-> {
+            if (plan instanceof UnboundTVFRelation) {
+                UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
+                Map<String, String> oriMap = 
originTvfRel.getProperties().getMap();
+                props.putAll(oriMap);
+                props.put("uri", offset.getFileLists());
+                return new UnboundTVFRelation(
+                        originTvfRel.getRelationId(), 
originTvfRel.getFunctionName(), new Properties(props));
             }
             return plan;
         });
@@ -120,14 +123,17 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     @Override
     public void updateOffset(Offset offset) {
         this.currentOffset = (S3Offset) offset;
+        this.currentOffset.setFileLists(null);
     }
 
     @Override
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
-        StorageProperties storageProperties = 
StorageProperties.createPrimary(properties);
+        Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        copiedProps.putAll(properties);
+        StorageProperties storageProperties = 
StorageProperties.createPrimary(copiedProps);
         String startFile = currentOffset == null ? null : 
currentOffset.endFile;
         try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
-            String uri = storageProperties.validateAndGetUri(properties);
+            String uri = storageProperties.validateAndGetUri(copiedProps);
             String filePath = storageProperties.validateAndNormalizeUri(uri);
             maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new 
ArrayList<>(), startFile,
                     1, 1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 53a90893b81..b1823de4a1d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -22,13 +22,9 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
-import org.apache.doris.job.base.JobExecutionConfiguration;
-import org.apache.doris.job.base.TimerDefinition;
-import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
-import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
@@ -81,25 +77,18 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync {
         if (job instanceof StreamingInsertJob) {
             StreamingInsertJob originJob = (StreamingInsertJob) job;
             String updateSQL = StringUtils.isEmpty(sql) ? 
originJob.getExecuteSql() : sql;
-            Map<String, String> updateProps = properties == null || 
properties.isEmpty() ? originJob.getJobProperties()
-                    .getProperties() : properties;
-            StreamingJobProperties streamJobProps = new 
StreamingJobProperties(updateProps);
-            // rebuild time definition
-            JobExecutionConfiguration execConfig = originJob.getJobConfig();
-            TimerDefinition timerDefinition = new TimerDefinition();
-            timerDefinition.setInterval(streamJobProps.getMaxIntervalSecond());
-            timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
-            
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
-            execConfig.setTimerDefinition(timerDefinition);
+            Map<String, String> updateProps =
+                    properties == null || properties.isEmpty() ? 
originJob.getProperties() : properties;
+
             return new StreamingInsertJob(jobName,
                     job.getJobStatus(),
                     job.getCurrentDbName(),
                     job.getComment(),
                     ConnectContext.get().getCurrentUserIdentity(),
-                    execConfig,
+                    originJob.getJobConfig(),
                     System.currentTimeMillis(),
                     updateSQL,
-                    streamJobProps);
+                    updateProps);
         } else {
             throw new JobException("Unsupported job type for ALTER:" + 
job.getJobType());
         }
@@ -117,7 +106,7 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync {
         if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
                 && job instanceof StreamingInsertJob) {
             StreamingInsertJob streamingJob = (StreamingInsertJob) job;
-            boolean proCheck = 
checkProperties(streamingJob.getJobProperties().getProperties());
+            boolean proCheck = checkProperties(streamingJob.getProperties());
             boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
             if (!proCheck && !sqlCheck) {
                 throw new AnalysisException("No properties or sql changed in 
ALTER JOB");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 51d0dddc7cb..40a4d583059 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -125,11 +125,8 @@ public class CreateJobInfo {
         // check its insert stmt,currently only support insert stmt
         JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
         JobExecuteType executeType = intervalOptional.isPresent() ? 
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
-        JobProperties properties = null;
         if (streamingJob) {
             executeType = JobExecuteType.STREAMING;
-            properties = new StreamingJobProperties(jobProperties);
-            properties.validate();
             jobExecutionConfiguration.setImmediate(true);
         }
         jobExecutionConfiguration.setExecuteType(executeType);
@@ -138,21 +135,18 @@ public class CreateJobInfo {
         if (executeType.equals(JobExecuteType.ONE_TIME)) {
             buildOnceJob(timerDefinition, jobExecutionConfiguration);
         } else if (executeType.equals(JobExecuteType.STREAMING)) {
-            buildStreamingJob(timerDefinition, properties);
+            buildStreamingJob(timerDefinition);
         } else {
             buildRecurringJob(timerDefinition, jobExecutionConfiguration);
         }
         jobExecutionConfiguration.setTimerDefinition(timerDefinition);
-        return analyzeAndCreateJob(executeSql, dbName, 
jobExecutionConfiguration, properties);
+        return analyzeAndCreateJob(executeSql, dbName, 
jobExecutionConfiguration, jobProperties);
     }
 
-    private void buildStreamingJob(TimerDefinition timerDefinition, 
JobProperties props)
-            throws AnalysisException {
-        StreamingJobProperties properties = (StreamingJobProperties) props;
-        timerDefinition.setInterval(properties.getMaxIntervalSecond());
+    private void buildStreamingJob(TimerDefinition timerDefinition) {
+        // timerDefinition.setInterval(properties.getMaxIntervalSecond());
         timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
         timerDefinition.setStartTimeMs(System.currentTimeMillis());
-        properties.validate();
     }
 
     /**
@@ -237,7 +231,7 @@ public class CreateJobInfo {
      */
     private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
                                             JobExecutionConfiguration 
jobExecutionConfiguration,
-                                            JobProperties properties) throws 
UserException {
+                                            Map<String, String> properties) 
throws UserException {
         if 
(jobExecutionConfiguration.getExecuteType().equals(JobExecuteType.STREAMING)) {
             return analyzeAndCreateStreamingInsertJob(sql, currentDbName, 
jobExecutionConfiguration, properties);
         } else {
@@ -271,13 +265,13 @@ public class CreateJobInfo {
     }
 
     private AbstractJob analyzeAndCreateStreamingInsertJob(String sql, String 
currentDbName,
-            JobExecutionConfiguration jobExecutionConfiguration, JobProperties 
properties) throws UserException {
+            JobExecutionConfiguration jobExecutionConfiguration, Map<String, 
String> properties) throws UserException {
         NereidsParser parser = new NereidsParser();
         LogicalPlan logicalPlan = parser.parseSingle(sql);
         if (logicalPlan instanceof InsertIntoTableCommand) {
-            // InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) logicalPlan;
+            InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) logicalPlan;
             try {
-                // insertIntoTableCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
+                insertIntoTableCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
                 return new StreamingInsertJob(labelNameOptional.get(),
                         JobStatus.PENDING,
                         currentDbName,
@@ -286,7 +280,7 @@ public class CreateJobInfo {
                         jobExecutionConfiguration,
                         System.currentTimeMillis(),
                         sql,
-                        (StreamingJobProperties) properties);
+                        properties);
             } catch (Exception e) {
                 throw new AnalysisException(e.getMessage());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 6736b16f8af..e774499f361 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -61,6 +61,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -103,6 +104,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
     private Optional<LogicalPlan> logicalQuery;
     private Optional<String> labelName;
     private Optional<String> branchName;
+    private Optional<Plan> parsedPlan;
     /**
      * When source it's from job scheduler,it will be set.
      */
@@ -153,6 +155,10 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         return logicalQuery.orElse(originLogicalQuery);
     }
 
+    public Optional<Plan> getParsedPlan() {
+        return parsedPlan;
+    }
+
     protected void setLogicalQuery(LogicalPlan logicalQuery) {
         this.logicalQuery = Optional.of(logicalQuery);
     }
@@ -233,6 +239,7 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
                 throw new IllegalStateException(e.getMessage(), e);
             }
             insertExecutor = buildResult.executor;
+            parsedPlan = 
Optional.ofNullable(buildResult.planner.getParsedPlan());
             if (!needBeginTransaction) {
                 return insertExecutor;
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
index bef0bb92d20..f05f1791f2f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
@@ -20,22 +20,31 @@ package org.apache.doris.catalog;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.functions.table.S3;
+import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand;
 import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.utframe.DorisAssert;
 import org.apache.doris.utframe.UtFrameUtils;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 
 
@@ -63,9 +72,27 @@ public class DropFunctionTest {
     public void testDropGlobalFunction() throws Exception {
         ConnectContext ctx = UtFrameUtils.createDefaultCtx();
         // 1. create database db1
-        String sql = "create database db1;";
+        //String sql = "create database db1;";
+        String sql = "insert into db1.tb select * from 
s3('url'='s3://a/*.csv')";
         NereidsParser nereidsParser = new NereidsParser();
         LogicalPlan logicalPlan = nereidsParser.parseSingle(sql);
+        InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(sql);
+        baseCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
+        Map<String, String> map = new HashMap<>();
+        map.put("url" ,"s3:/xxxx/*.");
+        // rewrite plan
+        Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
+            if (plan instanceof LogicalTVFRelation) {
+                LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
+                LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
+                        originTvfRel.getRelationId(), new S3(new 
Properties(map)), ImmutableList.of());
+                return newRvfRel;
+            }
+            return plan;
+        });
+        InsertIntoTableCommand s = new InsertIntoTableCommand((LogicalPlan) 
rewritePlan, Optional.empty(), Optional.empty(),
+                Optional.empty(), true, Optional.empty());
+
         StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
         if (logicalPlan instanceof CreateDatabaseCommand) {
             ((CreateDatabaseCommand) logicalPlan).run(connectContext, 
stmtExecutor);
diff --git 
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out
new file mode 100644
index 00000000000..867b1f9432b
Binary files /dev/null and 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job.out differ
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
new file mode 100644
index 00000000000..bd8ee2759fd
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job") {
+    def tableName = "test_streaming_insert_job_tbl"
+    def jobName = "test_streaming_insert_job_name"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+
+    // create recurring job
+    sql """
+       CREATE JOB ${jobName}  ON STREAMING DO INSERT INTO ${tableName} 
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    Awaitility.await().atMost(30, SECONDS).until(
+            {
+                print("check success task count")
+                def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                // check job status and succeed task count larger than 1
+                jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+            }
+    )
+
+    sql """
+        PAUSE JOB where jobname =  '${jobName}'
+    """
+    def pausedJobStatus = sql """
+        select status from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+    qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to