This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new e0ee446b8d6 [Feature](wip) improve streamjob and delete alterstreamjob 
op (#56308)
e0ee446b8d6 is described below

commit e0ee446b8d6458cfc495a594d7948a2f464953e2
Author: wudi <[email protected]>
AuthorDate: Mon Sep 22 18:39:03 2025 +0800

    [Feature](wip) improve streamjob and delete alterstreamjob op (#56308)
    
    ### What problem does this PR solve?
    
     improve streamjob and delete alterstreamjob op
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  2 +-
 .../property/storage/HdfsPropertiesUtils.java      | 12 ++-
 .../property/storage/StorageProperties.java        | 57 ++------------
 .../main/java/org/apache/doris/fs/FileSystem.java  |  2 +-
 .../java/org/apache/doris/fs/GlobListResult.java   | 57 ++++++++++++++
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 33 ++-------
 .../org/apache/doris/fs/remote/RemoteFile.java     | 18 -----
 .../org/apache/doris/fs/remote/S3FileSystem.java   |  3 +-
 .../org/apache/doris/job/base/AbstractJob.java     |  9 +--
 .../doris/job/executor/DispatchTaskHandler.java    |  5 +-
 .../insert/streaming/StreamingInsertJob.java       | 30 ++++----
 .../insert/streaming/StreamingJobProperties.java   | 31 ++++----
 .../org/apache/doris/job/manager/JobManager.java   | 23 +++---
 .../java/org/apache/doris/job/offset/Offset.java   |  2 +
 .../doris/job/offset/SourceOffsetProvider.java     | 14 +---
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  6 ++
 .../job/offset/s3/S3SourceOffsetProvider.java      | 69 ++++++++++-------
 .../org/apache/doris/journal/JournalEntity.java    |  6 --
 .../trees/plans/commands/AlterJobCommand.java      |  6 +-
 .../trees/plans/commands/info/CreateJobInfo.java   |  4 +-
 .../commands/insert/InsertIntoTableCommand.java    | 19 +----
 .../persist/AlterStreamingJobOperationLog.java     | 86 ----------------------
 .../java/org/apache/doris/persist/EditLog.java     |  9 ---
 .../org/apache/doris/persist/OperationType.java    |  2 -
 .../java/org/apache/doris/qe/SessionVariable.java  |  1 +
 .../streaming_job/test_streaming_insert_job.groovy | 10 +--
 26 files changed, 195 insertions(+), 321 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index d4cfe9ead5d..3062dfef988 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -112,7 +112,7 @@ supportedJobStatement
        commentSpec?
        DO supportedDmlStatement                                                
                                                             #createScheduledJob
    | PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)  
                                                              #pauseJob
-   | ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause | 
supportedDmlStatement | propertyClause  supportedDmlStatement)                  
#alterJob
+   | ALTER JOB (jobName=multipartIdentifier) (propertyClause | 
supportedDmlStatement | propertyClause  supportedDmlStatement)                  
#alterJob
    | DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL)                                                   
 #dropJob
    | RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) 
                                                              #resumeJob
    | CANCEL TASK WHERE (jobNameKey=identifier) EQ 
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ 
(taskIdValue=INTEGER_VALUE)    #cancelJobTask
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
index 7fc091daa64..8f2e92d1e64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
@@ -23,6 +23,8 @@ import 
org.apache.doris.datasource.property.storage.exception.StoragePropertiesE
 import com.google.common.base.Strings;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
@@ -38,6 +40,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class HdfsPropertiesUtils {
+    private static final Logger LOG = 
LogManager.getLogger(HdfsPropertiesUtils.class);
     private static final String URI_KEY = "uri";
     private static final String STANDARD_HDFS_PREFIX = "hdfs://";
     private static final String EMPTY_HDFS_PREFIX = "hdfs:///";
@@ -63,7 +66,14 @@ public class HdfsPropertiesUtils {
         if (StringUtils.isBlank(uriStr)) {
             return false;
         }
-        URI uri = URI.create(uriStr);
+        URI uri;
+        try {
+            uri = URI.create(uriStr);
+        } catch (Exception ex) {
+            // The glob syntax of s3 contains {, which will cause an error 
here.
+            LOG.warn("Failed to validate uri is hdfs uri, {}", 
ex.getMessage());
+            return false;
+        }
         String schema = uri.getScheme();
         if (StringUtils.isBlank(schema)) {
             throw new IllegalArgumentException("Invalid uri: " + uriStr + ", 
extract schema is null");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 337b897f256..2ce87f9ffb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -146,62 +146,17 @@ public abstract class StorageProperties extends 
ConnectionProperties {
      * @throws RuntimeException if no supported storage type is found
      */
     public static StorageProperties createPrimary(Map<String, String> 
origProps) {
-        StorageProperties p = createPrimaryInternal(origProps);
-        if (p == null) {
-            for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
-                p = func.apply(origProps);
-                if (p != null) {
-                    break;
-                }
+        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
+            StorageProperties p = func.apply(origProps);
+            if (p != null) {
+                p.initNormalizeAndCheckProps();
+                p.initializeHadoopStorageConfig();
+                return p;
             }
         }
-        if (p != null) {
-            p.initNormalizeAndCheckProps();
-            p.initializeHadoopStorageConfig();
-            return p;
-        }
         throw new StoragePropertiesException("No supported storage type found. 
Please check your configuration.");
     }
 
-    private static StorageProperties createPrimaryInternal(Map<String, String> 
origProps) {
-        String provider = origProps.get(FS_PROVIDER_KEY);
-        if (provider == null) {
-            return null;
-        }
-
-        try {
-            Type type = Type.valueOf(provider.trim().toUpperCase());
-            switch (type) {
-                case HDFS:
-                    return new HdfsProperties(origProps);
-                case OSS_HDFS:
-                    return new OSSHdfsProperties(origProps);
-                case S3:
-                    return new S3Properties(origProps);
-                case OSS:
-                    return new OSSProperties(origProps);
-                case OBS:
-                    return new OBSProperties(origProps);
-                case COS:
-                    return new COSProperties(origProps);
-                case GCS:
-                    return new GCSProperties(origProps);
-                case AZURE:
-                    return new AzureProperties(origProps);
-                case MINIO:
-                    return new MinioProperties(origProps);
-                case BROKER:
-                    return new BrokerProperties(origProps);
-                case LOCAL:
-                    return new LocalProperties(origProps);
-                default:
-                    return null;
-            }
-        } catch (Exception e) {
-            return null;
-        }
-    }
-
     private static final List<Function<Map<String, String>, 
StorageProperties>> PROVIDERS =
             Arrays.asList(
                     props -> (isFsSupport(props, FS_HDFS_SUPPORT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index b2c6957ce38..81cdaf5d2a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -124,7 +124,7 @@ public interface FileSystem {
      * @param fileNumLimit limit the total number of files to be listed.
      * @return
      */
-    default String globListWithLimit(String remotePath, List<RemoteFile> 
result,
+    default GlobListResult globListWithLimit(String remotePath, 
List<RemoteFile> result,
             String startFile, long fileSizeLimit, long fileNumLimit) {
         throw new UnsupportedOperationException("Unsupported operation glob 
list with limit on current file system.");
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java
new file mode 100644
index 00000000000..af0d2817ffc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/GlobListResult.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+
+public class GlobListResult {
+    private final Status status;
+    private final String maxFile;
+    private final String bucket;
+    private final String prefix;
+
+    public GlobListResult(Status status, String maxFile, String bucket, String 
prefix) {
+        this.status = status;
+        this.maxFile = maxFile;
+        this.bucket = bucket;
+        this.prefix = prefix;
+    }
+
+    public GlobListResult(Status status) {
+        this.status = status;
+        this.maxFile = "";
+        this.bucket = "";
+        this.prefix = "";
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+    public String getMaxFile() {
+        return maxFile;
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index f7cc644017f..9522449c7d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.S3URI;
 import org.apache.doris.common.util.S3Util;
 import org.apache.doris.common.util.Util;
 import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
+import org.apache.doris.fs.GlobListResult;
 import org.apache.doris.fs.remote.RemoteFile;
 
 import org.apache.commons.lang3.StringUtils;
@@ -541,13 +542,11 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
      * Limit: Starting from startFile, until the total file size is greater 
than fileSizeLimit,
      * or the number of files is greater than fileNumLimit.
      *
-     * @return The largest file name after listObject this time
+     * @return GlobListResult
      */
-    public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
+    public GlobListResult globListWithLimit(String remotePath, 
List<RemoteFile> result, String startFile,
             long fileSizeLimit, long fileNumLimit) {
-        GlobListResult globListResult = globListInternal(remotePath, result, 
true, startFile, fileSizeLimit,
-                fileNumLimit);
-        return globListResult.getMaxFile();
+        return globListInternal(remotePath, result, false, startFile, 
fileSizeLimit, fileNumLimit);
     }
 
     /**
@@ -643,8 +642,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                                 isPrefix ? -1 : obj.size(),
                                 isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
                         );
-                        remoteFile.setBucket(bucket);
-                        
remoteFile.setParentPath(objPath.getParent().toString());
                         result.add(remoteFile);
 
                         if (hasLimits && reachLimit(result.size(), 
matchFileSize, fileSizeLimit, fileNumLimit)) {
@@ -677,11 +674,11 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("remotePath:{}, result:{}", remotePath, result);
             }
-            return new GlobListResult(Status.OK, currentMaxFile);
+            return new GlobListResult(Status.OK, currentMaxFile, bucket, 
finalPrefix);
         } catch (Exception e) {
             LOG.warn("Errors while getting file status", e);
             return new GlobListResult(new Status(Status.ErrCode.COMMON_ERROR,
-                    "Errors while getting file status " + 
Util.getRootCauseMessage(e)), "");
+                    "Errors while getting file status " + 
Util.getRootCauseMessage(e)));
         } finally {
             long endTime = System.nanoTime();
             long duration = endTime - startTime;
@@ -715,24 +712,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
         return false;
     }
 
-    private static class GlobListResult {
-        private final Status status;
-        private final String maxFile;
-
-        public GlobListResult(Status status, String maxFile) {
-            this.status = status;
-            this.maxFile = maxFile;
-        }
-
-        public Status getStatus() {
-            return status;
-        }
-
-        public String getMaxFile() {
-            return maxFile;
-        }
-    }
-
     @Override
     public synchronized void close() throws Exception {
         if (client != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index ac1f8add947..1f6f0225278 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -36,8 +36,6 @@ public class RemoteFile {
     private long modificationTime;
     private Path path;
     BlockLocation[] blockLocations;
-    private String parentPath;
-    private String bucket;
 
     public RemoteFile(String name, boolean isFile, long size, long blockSize) {
         this(name, null, isFile, !isFile, size, blockSize, 0, null);
@@ -77,22 +75,6 @@ public class RemoteFile {
         this.path = path;
     }
 
-    public String getBucket() {
-        return bucket;
-    }
-
-    public void setBucket(String bucket) {
-        this.bucket = bucket;
-    }
-
-    public String getParentPath() {
-        return parentPath;
-    }
-
-    public void setParentPath(String parentPath) {
-        this.parentPath = parentPath;
-    }
-
     public boolean isFile() {
         return isFile;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index d4e5a23c20e..007b67de84d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.S3URI;
 import 
org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fs.GlobListResult;
 import org.apache.doris.fs.obj.S3ObjStorage;
 
 import org.apache.logging.log4j.LogManager;
@@ -69,7 +70,7 @@ public class S3FileSystem extends ObjFileSystem {
     }
 
     @Override
-    public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
+    public GlobListResult globListWithLimit(String remotePath, 
List<RemoteFile> result, String startFile,
             long fileSizeLimit, long fileNumLimit) {
         S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
         return objStorage.globListWithLimit(remotePath, result, startFile, 
fileSizeLimit, fileNumLimit);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index a58dddb55e7..5b03ae6d18b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -31,7 +31,6 @@ import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.thrift.TCell;
@@ -218,6 +217,10 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     }
 
     public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
+        if (JobExecuteType.STREAMING.equals(getJobConfig().getExecuteType())) {
+            taskType = TaskType.STREAMING;
+        }
+
         if (!canCreateTask(taskType)) {
             log.info("job is not ready for scheduling, job id is {},job status 
is {}, taskType is {}", jobId,
                     jobStatus, taskType);
@@ -478,10 +481,6 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", 
"replay delete scheduler job").build());
     }
 
-    public void onReplayUpdateStreaming(AlterStreamingJobOperationLog 
operationLog) {
-        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg", 
"replay update streaming job").build());
-    }
-
     public boolean needPersist() {
         return true;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 23710c17659..35b1f351f72 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -18,7 +18,6 @@
 package org.apache.doris.job.executor;
 
 import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.JobUtils;
 import org.apache.doris.job.common.TaskType;
@@ -57,9 +56,7 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 return;
             }
             if (event.getJob().isReadyForScheduling(null) && 
JobUtils.checkNeedSchedule(event.getJob())) {
-                TaskType taskType = 
JobExecuteType.STREAMING.equals(event.getJob().getJobConfig().getExecuteType())
-                        ? TaskType.STREAMING : TaskType.SCHEDULED;
-                List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(taskType, null);
+                List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
                 if (CollectionUtils.isEmpty(tasks)) {
                     log.warn("job is ready for scheduling, but create task is 
empty, skip scheduler,"
                                     + "job id is {}," + " job name is {}", 
event.getJob().getJobId(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e8fd03c8de4..b32ecdcb94e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -46,7 +46,6 @@ import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -253,12 +252,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         return offsetProvider.hasMoreDataToConsume();
     }
 
-    @Override
-    public void logUpdateOperation() {
-        AlterStreamingJobOperationLog log =
-                new AlterStreamingJobOperationLog(this.getJobId(), 
this.getJobStatus(), properties, getExecuteSql());
-        Env.getCurrentEnv().getEditLog().logUpdateStreamingJob(log);
-    }
 
     @Override
     public void onTaskFail(StreamingJobSchedulerTask task) throws JobException 
{
@@ -321,13 +314,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.onReplayCreate();
     }
 
-    @Override
-    public void onReplayUpdateStreaming(AlterStreamingJobOperationLog 
operationLog) {
-        super.onReplayUpdateStreaming(operationLog);
-        setJobStatus(operationLog.getStatus());
-        this.properties = operationLog.getJobProperties();
+    /**
+     * Because the offset statistics of the streamingInsertJob are all stored 
in txn,
+     * only some fields are replayed here.
+     * @param replayJob
+     */
+    public void replayOnUpdated(StreamingInsertJob replayJob) {
+        setJobStatus(replayJob.getJobStatus());
+        this.properties = replayJob.getProperties();
         this.jobProperties = new StreamingJobProperties(properties);
-        setExecuteSql(operationLog.getExecuteSql());
+        setExecuteSql(replayJob.getExecuteSql());
     }
 
     @Override
@@ -358,14 +354,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new TCell().setStringVal(properties != null
                 ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
 
-        if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getSyncOffset()));
+        if (offsetProvider != null && offsetProvider.getConsumedOffset() != 
null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getConsumedOffset()));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
 
-        if (offsetProvider != null && offsetProvider.getRemoteOffset() != 
null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+        if (offsetProvider != null && offsetProvider.getMaxOffset() != null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getMaxOffset()));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index a79bc1b230d..35aa601fceb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -32,44 +32,45 @@ import java.util.Map;
 @Data
 public class StreamingJobProperties implements JobProperties {
     public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
-    public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
-    public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+    public static final String S3_MAX_BATCH_FILES_PROPERTY = 
"s3.max_batch_files";
+    public static final String S3_MAX_BATCH_BYTES_PROPERTY = 
"s3.max_batch_bytes";
     public static final String SESSION_VAR_PREFIX = "session.";
 
     public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
-    public static final long DEFAULT_S3_BATCH_FILES = 256;
-    public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L; 
// 10GB
+    public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
+    public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 * 
1024L; // 10GB
     public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
 
     private final Map<String, String> properties;
     private long maxIntervalSecond;
     private long s3BatchFiles;
-    private long s3BatchSize;
+    private long s3BatchBytes;
 
     public StreamingJobProperties(Map<String, String> jobProperties) {
         this.properties = jobProperties;
         if (properties.isEmpty()) {
             this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND;
-            this.s3BatchFiles = DEFAULT_S3_BATCH_FILES;
-            this.s3BatchSize = DEFAULT_S3_BATCH_SIZE;
+            this.s3BatchFiles = DEFAULT_MAX_S3_BATCH_FILES;
+            this.s3BatchBytes = DEFAULT_MAX_S3_BATCH_BYTES;
         }
     }
 
     public void validate() throws AnalysisException {
         this.maxIntervalSecond = Util.getLongPropertyOrDefault(
-                
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
-                StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, (v) -> v 
>= 1,
+                        
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
+                        StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, 
(v) -> v >= 1,
                 StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY + " should 
> 1");
 
         this.s3BatchFiles = Util.getLongPropertyOrDefault(
-                properties.get(StreamingJobProperties.S3_BATCH_FILES_PROPERTY),
-                StreamingJobProperties.DEFAULT_S3_BATCH_FILES, (v) -> v >= 1,
-                StreamingJobProperties.S3_BATCH_FILES_PROPERTY + " should >=1 
");
+                        
properties.get(StreamingJobProperties.S3_MAX_BATCH_FILES_PROPERTY),
+                        StreamingJobProperties.DEFAULT_MAX_S3_BATCH_FILES, (v) 
-> v >= 1,
+                StreamingJobProperties.S3_MAX_BATCH_FILES_PROPERTY + " should 
>=1 ");
 
-        this.s3BatchSize = 
Util.getLongPropertyOrDefault(properties.get(StreamingJobProperties.S3_BATCH_SIZE_PROPERTY),
-                StreamingJobProperties.DEFAULT_S3_BATCH_SIZE, (v) -> v >= 100 
* 1024 * 1024
+        this.s3BatchBytes = Util.getLongPropertyOrDefault(
+                        
properties.get(StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY),
+                        StreamingJobProperties.DEFAULT_MAX_S3_BATCH_BYTES, (v) 
-> v >= 100 * 1024 * 1024
                         && v <= (long) (1024 * 1024 * 1024) * 10,
-                StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should 
between 100MB and 10GB");
+                StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY + " should 
between 100MB and 10GB");
     }
 
     public SessionVariable getSessionVariable() throws JobException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 14b12f837e0..8ce9fb7ea73 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -38,6 +38,7 @@ import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.scheduler.JobScheduler;
 import org.apache.doris.job.scheduler.StreamingTaskScheduler;
@@ -45,7 +46,6 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.expressions.And;
 import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
@@ -368,21 +368,16 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
             LOG.warn("replayUpdateJob not normal, job: {}, jobId: {}, jobMap: 
{}", job, jobId, jobMap);
             return;
         }
-        jobMap.put(jobId, job);
-        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
-                .add("msg", "replay update scheduler job").build());
-    }
 
-    public void replayUpdateStreamingJob(AlterStreamingJobOperationLog log) {
-        Long jobId = log.getJobId();
-        if (!jobMap.containsKey(jobId)) {
-            LOG.warn("replayUpdateStreamingJob not normal, jobId: {}, jobMap: 
{}", jobId, log);
-            return;
+        if (job instanceof StreamingInsertJob) {
+            // for streaming job, we only update part properties
+            StreamingInsertJob currentJob = (StreamingInsertJob) 
jobMap.get(jobId);
+            currentJob.replayOnUpdated((StreamingInsertJob) job);
+        } else {
+            jobMap.put(jobId, job);
         }
-        T job = jobMap.get(jobId);
-        job.onReplayUpdateStreaming(log);
-        LOG.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
-                .add("msg", "replay update streaming job").build());
+        log.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
+                .add("msg", "replay update scheduler job").build());
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index 03e664ccd3e..cebff9a39b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -19,4 +19,6 @@ package org.apache.doris.job.offset;
 
 public interface Offset {
     String toSerializedJson();
+
+    boolean isEmpty();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index d7e1bee4669..a27b66812c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -40,22 +40,16 @@ public interface SourceOffsetProvider {
     Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> 
properties);
 
     /**
-     * Get current offset
+     * Get consumered offset to show
      * @return
      */
-    Offset getCurrentOffset();
+    String getConsumedOffset();
 
     /**
-     * Get sync offset to show
+     * Get remote datasource max offset
      * @return
      */
-    String getSyncOffset();
-
-    /**
-     * Get remote offset
-     * @return
-     */
-    String getRemoteOffset();
+    String getMaxOffset();
 
     /**
      * Rewrite the TVF parameters in the SQL based on the current offset.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index cb783adfbff..b152660873a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -23,6 +23,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 
 @Getter
 @Setter
@@ -39,6 +40,11 @@ public class S3Offset implements Offset {
         return GsonUtils.GSON.toJson(this);
     }
 
+    @Override
+    public boolean isEmpty() {
+        return StringUtils.isEmpty(fileLists);
+    }
+
     @Override
     public String toString() {
         return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" + 
endFile + "\" }";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index d433e42f73c..e6a5cf809a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.offset.s3;
 
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.GlobListResult;
 import org.apache.doris.fs.remote.RemoteFile;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
 @Log4j2
 public class S3SourceOffsetProvider implements SourceOffsetProvider {
     S3Offset currentOffset;
-    String maxRemoteEndFile;
+    String maxEndFile;
 
     @Override
     public String getSourceType() {
@@ -64,18 +65,41 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
         try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
             String uri = storageProperties.validateAndGetUri(copiedProps);
             filePath = storageProperties.validateAndNormalizeUri(uri);
-            maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, 
startFile,
-                    jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
-            offset.setStartFile(startFile);
-            //todo: The path may be in the form of bucket/dir/*/*,
-            // but currently only the case where the last segment is * is 
handled.
+            GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, rfiles, startFile,
+                    jobProps.getS3BatchBytes(), jobProps.getS3BatchFiles());
+
             if (!rfiles.isEmpty()) {
-                String bucket = rfiles.get(0).getBucket();
-                String parentPath = rfiles.get(0).getParentPath();
-                String filePaths = 
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{", 
"}"));
-                String finalFiles = String.format("s3://%s/%s/%s", bucket, 
parentPath, filePaths);
-                offset.setEndFile(String.format("%s/%s", parentPath, 
rfiles.get(rfiles.size() - 1).getName()));
-                offset.setFileLists(finalFiles);
+                String bucket = globListResult.getBucket();
+                String prefix = globListResult.getPrefix();
+
+                offset.setStartFile(startFile);
+                String bucketBase = "s3://" + bucket + "/";
+                // Get the path of the last directory
+                int lastSlash = prefix.lastIndexOf('/');
+                String basePrefix = (lastSlash >= 0) ? prefix.substring(0, 
lastSlash + 1) : "";
+                String filePathBase = bucketBase + basePrefix;
+                String joined = rfiles.stream()
+                        .filter(name -> !name.equals(filePathBase)) // Single 
file case
+                        .map(path -> path.getName().replace(filePathBase, ""))
+                        .collect(Collectors.joining(","));
+
+                if (joined.isEmpty()) {
+                    // base is a single file
+                    offset.setFileLists(filePathBase);
+                    String lastFile = rfiles.get(rfiles.size() - 
1).getName().replace(bucketBase, "");
+                    offset.setEndFile(lastFile);
+                } else {
+                    // base is dir
+                    String normalizedPrefix = basePrefix.endsWith("/")
+                            ? basePrefix.substring(0, basePrefix.length() - 1) 
: basePrefix;
+                    String finalFileLists = String.format("s3://%s/%s/{%s}", 
bucket, normalizedPrefix, joined);
+                    String lastFile = rfiles.get(rfiles.size() - 
1).getName().replace(bucketBase, "");
+                    offset.setFileLists(finalFileLists);
+                    offset.setEndFile(lastFile);
+                }
+                maxEndFile = globListResult.getMaxFile();
+            } else {
+                throw new RuntimeException("No new files found in path: " + 
filePath);
             }
         } catch (Exception e) {
             log.warn("list path exception, path={}", filePath, e);
@@ -85,12 +109,7 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public Offset getCurrentOffset() {
-        return currentOffset;
-    }
-
-    @Override
-    public String getSyncOffset() {
+    public String getConsumedOffset() {
         if (currentOffset != null) {
             return currentOffset.getEndFile();
         }
@@ -98,8 +117,8 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public String getRemoteOffset() {
-        return maxRemoteEndFile;
+    public String getMaxOffset() {
+        return maxEndFile;
     }
 
     @Override
@@ -138,11 +157,11 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
             String uri = storageProperties.validateAndGetUri(copiedProps);
             String filePath = storageProperties.validateAndNormalizeUri(uri);
             List<RemoteFile> objects = new ArrayList<>();
-            String endFile = fileSystem.globListWithLimit(filePath, objects, 
startFile, 1, 1);
-            if (!objects.isEmpty() && StringUtils.isNotEmpty(endFile)) {
-                maxRemoteEndFile = endFile;
+            GlobListResult globListResult = 
fileSystem.globListWithLimit(filePath, objects, startFile, 1, 1);
+            if (globListResult != null && !objects.isEmpty() && 
StringUtils.isNotEmpty(globListResult.getMaxFile())) {
+                maxEndFile = globListResult.getMaxFile();
             } else {
-                maxRemoteEndFile = startFile;
+                maxEndFile = startFile;
             }
         } catch (Exception e) {
             throw e;
@@ -154,7 +173,7 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
         if (currentOffset == null) {
             return true;
         }
-        if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) {
+        if (currentOffset.endFile.compareTo(maxEndFile) < 0) {
             return true;
         }
         return false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index f0f9d471ed9..cb6dee000de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -67,7 +67,6 @@ import org.apache.doris.persist.AlterDatabasePropertyInfo;
 import org.apache.doris.persist.AlterLightSchemaChangeInfo;
 import org.apache.doris.persist.AlterMTMV;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
-import org.apache.doris.persist.AlterStreamingJobOperationLog;
 import org.apache.doris.persist.AlterUserOperationLog;
 import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.AnalyzeDeletionLog;
@@ -551,11 +550,6 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_UPDATE_STREAMING_JOB: {
-                data = AlterStreamingJobOperationLog.read(in);
-                isRead = true;
-                break;
-            }
             case OperationType.OP_CREATE_SCHEDULER_TASK:
             case OperationType.OP_DELETE_SCHEDULER_TASK: {
                 //todo improve
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index adde81fbfe9..8d5bd244df7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -41,9 +41,9 @@ import java.util.Objects;
 public class AlterJobCommand extends AlterCommand implements ForwardWithSync {
     // exclude job name prefix, which is used by inner job
     private static final String excludeJobNamePrefix = "inner_";
-    private String jobName;
-    private Map<String, String> properties;
-    private String sql;
+    private final String jobName;
+    private final Map<String, String> properties;
+    private final String sql;
 
     public AlterJobCommand(String jobName, Map<String, String> properties, 
String sql) {
         super(PlanType.ALTER_JOB_COMMAND);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 924f2bd4842..fecac5e135c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -283,8 +283,8 @@ public class CreateJobInfo {
                 throw new AnalysisException(e.getMessage());
             }
         } else {
-            throw new AnalysisException("Not support this sql : " + sql + " 
Command class is "
-                    + logicalPlan.getClass().getName() + ".");
+            throw new AnalysisException("Only " + 
logicalPlan.getClass().getName()
+                    + " is supported to use with streaming job together");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 6f7cec71857..9e5c54099b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -79,7 +79,6 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -544,24 +543,8 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
         }
     }
 
-    // todo: add ut
     public List<UnboundTVFRelation> getAllTVFRelation() {
-        List<UnboundTVFRelation> tvfs = new ArrayList<>();
-        findAllTVFInPlan(getLogicalQuery(), tvfs);
-        return tvfs;
-    }
-
-    private void findAllTVFInPlan(LogicalPlan plan, List<UnboundTVFRelation> 
tvfs) {
-        if (plan instanceof UnboundTVFRelation) {
-            UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
-            tvfs.add(tvfRelation);
-        }
-
-        for (Plan child : plan.children()) {
-            if (child instanceof LogicalPlan) {
-                findAllTVFInPlan((LogicalPlan) child, tvfs);
-            }
-        }
+        return 
getLogicalQuery().collectToList(UnboundTVFRelation.class::isInstance);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
deleted file mode 100644
index 785531fa0d1..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.persist;
-
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.persist.gson.GsonUtils;
-
-import com.google.gson.annotations.SerializedName;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-public class AlterStreamingJobOperationLog implements Writable {
-    @SerializedName(value = "jid")
-    private long jobId;
-    @SerializedName(value = "js")
-    private JobStatus status;
-    @SerializedName(value = "jp")
-    private Map<String, String> jobProperties;
-    @SerializedName(value = "sql")
-    String executeSql;
-
-    public AlterStreamingJobOperationLog(long jobId, JobStatus status,
-            Map<String, String> jobProperties, String executeSql) {
-        this.jobId = jobId;
-        this.status = status;
-        this.jobProperties = jobProperties;
-        this.executeSql = executeSql;
-    }
-
-    public long getJobId() {
-        return jobId;
-    }
-
-    public Map<String, String> getJobProperties() {
-        return jobProperties;
-    }
-
-    public String getExecuteSql() {
-        return executeSql;
-    }
-
-    public JobStatus getStatus() {
-        return status;
-    }
-
-    public static AlterStreamingJobOperationLog read(DataInput in) throws 
IOException {
-        String json = Text.readString(in);
-        return GsonUtils.GSON.fromJson(json, 
AlterStreamingJobOperationLog.class);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        String json = GsonUtils.GSON.toJson(this);
-        Text.writeString(out, json);
-    }
-
-    @Override
-    public String toString() {
-        return "AlterStreamingJobOperationLog{"
-                +  "jobId=" + jobId
-                + ", status=" + status
-                + ", jobProperties=" + jobProperties
-                + ", executeSql='" + executeSql + '\''
-                + '}';
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 74be0f5e475..5abcc4e0464 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -862,11 +862,6 @@ public class EditLog {
                     Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
                     break;
                 }
-                case OperationType.OP_UPDATE_STREAMING_JOB: {
-                    AlterStreamingJobOperationLog log = 
(AlterStreamingJobOperationLog) journal.getData();
-                    
Env.getCurrentEnv().getJobManager().replayUpdateStreamingJob(log);
-                    break;
-                }
                 /*case OperationType.OP_CREATE_SCHEDULER_TASK: {
                     JobTask task = (JobTask) journal.getData();
                     
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
@@ -2043,10 +2038,6 @@ public class EditLog {
         logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
     }
 
-    public void logUpdateStreamingJob(AlterStreamingJobOperationLog log) {
-        logEdit(OperationType.OP_UPDATE_STREAMING_JOB, log);
-    }
-
     public void logDeleteJob(AbstractJob job) {
         logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 12d59d086b9..2bbc03d4aea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -414,8 +414,6 @@ public class OperationType {
 
     public static final short OP_OPERATE_KEY = 492;
 
-    public static final short OP_UPDATE_STREAMING_JOB = 493;
-
     // For cloud.
     public static final short OP_UPDATE_CLOUD_REPLICA = 1000;
     @Deprecated
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6c8506af7d7..d5262d36a96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4761,6 +4761,7 @@ public class SessionVariable implements Serializable, 
Writable {
                         field.set(this, root.get(attr.name()));
                         break;
                     case "double":
+                        // root.get(attr.name()) always return Double type, so 
need to convert it.
                         field.set(this, 
Double.valueOf(root.get(attr.name()).toString()));
                         break;
                     case "String":
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index d89bc8d2f0f..54fc5faa2e8 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -46,7 +46,7 @@ suite("test_streaming_insert_job") {
     sql """
        CREATE JOB ${jobName}  
        PROPERTIES(
-        "s3.batch_files" = "1"
+        "s3.max_batch_files" = "1"
        )
        ON STREAMING DO INSERT INTO ${tableName} 
        SELECT * FROM S3
@@ -104,9 +104,9 @@ suite("test_streaming_insert_job") {
 
     // alter streaming job
     sql """
-       ALTER JOB FOR ${jobName}
+       ALTER JOB ${jobName}
        PROPERTIES(
-        "session.insert_max_filter_ratio" = "0.5"
+        "session.insert_max_filter_ratio" = 0.5
        )
        INSERT INTO ${tableName}
        SELECT * FROM S3
@@ -126,7 +126,7 @@ suite("test_streaming_insert_job") {
         select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
     """
     assert alterJobProperties.get(0).get(0) == "PAUSED"
-    assert alterJobProperties.get(0).get(1) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+    assert alterJobProperties.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
     assert alterJobProperties.get(0).get(2) == 
"regression/load/data/example_1.csv"
 
     sql """
@@ -136,7 +136,7 @@ suite("test_streaming_insert_job") {
         select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
     """
     assert resumeJobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
-    assert resumeJobStatus.get(0).get(1) == 
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+    assert resumeJobStatus.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
     assert resumeJobStatus.get(0).get(2) == 
"regression/load/data/example_1.csv"
 
     Awaitility.await().atMost(60, SECONDS)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to