This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 71a8baca13a [Fix](job) fix job bug (#56225)
71a8baca13a is described below

commit 71a8baca13a7d562d8ab7faf93f137302889ec5f
Author: wudi <[email protected]>
AuthorDate: Fri Sep 19 17:06:37 2025 +0800

    [Fix](job) fix job bug (#56225)
    
    ### What problem does this PR solve?
    
    [Fix](job) fix job bug
---
 .../property/storage/StorageProperties.java        |  63 ++++++--
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java | 175 ++++++++-------------
 .../org/apache/doris/job/base/AbstractJob.java     |   4 +-
 .../org/apache/doris/job/common/JobStatus.java     |   7 +-
 .../java/org/apache/doris/job/common/TaskType.java |   3 +-
 .../doris/job/executor/DispatchTaskHandler.java    |   5 +-
 .../insert/streaming/StreamingInsertJob.java       |  12 +-
 .../insert/streaming/StreamingInsertTask.java      |   4 +-
 .../streaming/StreamingJobSchedulerTask.java       |   2 +-
 .../insert/streaming/StreamingJobStatistic.java    |   4 +-
 .../org/apache/doris/job/manager/JobManager.java   |  15 +-
 .../java/org/apache/doris/job/offset/Offset.java   |   6 +-
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  18 +--
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 14 files changed, 159 insertions(+), 161 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 422d20a7560..337b897f256 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -146,19 +146,66 @@ public abstract class StorageProperties extends 
ConnectionProperties {
      * @throws RuntimeException if no supported storage type is found
      */
     public static StorageProperties createPrimary(Map<String, String> 
origProps) {
-        for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
-            StorageProperties p = func.apply(origProps);
-            if (p != null) {
-                p.initNormalizeAndCheckProps();
-                p.initializeHadoopStorageConfig();
-                return p;
+        StorageProperties p = createPrimaryInternal(origProps);
+        if (p == null) {
+            for (Function<Map<String, String>, StorageProperties> func : 
PROVIDERS) {
+                p = func.apply(origProps);
+                if (p != null) {
+                    break;
+                }
             }
         }
+        if (p != null) {
+            p.initNormalizeAndCheckProps();
+            p.initializeHadoopStorageConfig();
+            return p;
+        }
         throw new StoragePropertiesException("No supported storage type found. 
Please check your configuration.");
     }
 
+    private static StorageProperties createPrimaryInternal(Map<String, String> 
origProps) {
+        String provider = origProps.get(FS_PROVIDER_KEY);
+        if (provider == null) {
+            return null;
+        }
+
+        try {
+            Type type = Type.valueOf(provider.trim().toUpperCase());
+            switch (type) {
+                case HDFS:
+                    return new HdfsProperties(origProps);
+                case OSS_HDFS:
+                    return new OSSHdfsProperties(origProps);
+                case S3:
+                    return new S3Properties(origProps);
+                case OSS:
+                    return new OSSProperties(origProps);
+                case OBS:
+                    return new OBSProperties(origProps);
+                case COS:
+                    return new COSProperties(origProps);
+                case GCS:
+                    return new GCSProperties(origProps);
+                case AZURE:
+                    return new AzureProperties(origProps);
+                case MINIO:
+                    return new MinioProperties(origProps);
+                case BROKER:
+                    return new BrokerProperties(origProps);
+                case LOCAL:
+                    return new LocalProperties(origProps);
+                default:
+                    return null;
+            }
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     private static final List<Function<Map<String, String>, 
StorageProperties>> PROVIDERS =
             Arrays.asList(
+                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
+                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null,
                     props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
                             || isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
                             || OSSHdfsProperties.guessIsMe(props)) ? new 
OSSHdfsProperties(props) : null,
@@ -179,9 +226,7 @@ public abstract class StorageProperties extends 
ConnectionProperties {
                     props -> (isFsSupport(props, FS_BROKER_SUPPORT)
                             || BrokerProperties.guessIsMe(props)) ? new 
BrokerProperties(props) : null,
                     props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
-                            || LocalProperties.guessIsMe(props)) ? new 
LocalProperties(props) : null,
-                    props -> (isFsSupport(props, FS_HDFS_SUPPORT)
-                            || HdfsProperties.guessIsMe(props)) ? new 
HdfsProperties(props) : null
+                            || LocalProperties.guessIsMe(props)) ? new 
LocalProperties(props) : null
             );
 
     protected StorageProperties(Type type, Map<String, String> origProps) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 6f710b74c1b..f7cc644017f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -529,109 +529,10 @@ public class S3ObjStorage implements 
ObjStorage<S3Client> {
      * Copy from `AzureObjStorage.GlobList`
      */
     public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
-        long roundCnt = 0;
-        long elementCnt = 0;
-        long matchCnt = 0;
-        long startTime = System.nanoTime();
-        try {
-            S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
-            String bucket = uri.getBucket();
-            String globPath = uri.getKey(); // eg: path/to/*.csv
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("globList globPath:{}, remotePath:{}", globPath, 
remotePath);
-            }
-            java.nio.file.Path pathPattern = Paths.get(globPath);
-            PathMatcher matcher = 
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
-            HashSet<String> directorySet = new HashSet<>();
-
-            String listPrefix = S3Util.getLongestPrefix(globPath); // similar 
to Azure
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", 
listPrefix, globPath);
-            }
-
-            // For Directory Buckets, ensure proper prefix handling using 
standardized approach
-            String finalPrefix = listPrefix;
-
-            if (uri.useS3DirectoryBucket()) {
-                String adjustedPrefix = 
S3URI.getDirectoryPrefixForGlob(listPrefix);
-                if (LOG.isDebugEnabled() && 
!adjustedPrefix.equals(listPrefix)) {
-                    LOG.debug("Directory bucket detected, adjusting prefix 
from '{}' to '{}'",
-                            listPrefix, adjustedPrefix);
-                }
-                finalPrefix = adjustedPrefix;
-            }
-
-            ListObjectsV2Request request = ListObjectsV2Request.builder()
-                    .bucket(bucket)
-                    .prefix(finalPrefix)
-                    .build();
-
-            boolean isTruncated = false;
-            do {
-                roundCnt++;
-                ListObjectsV2Response response = listObjectsV2(request);
-                for (S3Object obj : response.contents()) {
-                    elementCnt++;
-                    java.nio.file.Path objPath = Paths.get(obj.key());
-
-                    boolean isPrefix = false;
-                    while (objPath != null && 
objPath.normalize().toString().startsWith(listPrefix)) {
-                        if (!matcher.matches(objPath)) {
-                            isPrefix = true;
-                            objPath = objPath.getParent();
-                            continue;
-                        }
-                        if 
(directorySet.contains(objPath.normalize().toString())) {
-                            break;
-                        }
-                        if (isPrefix) {
-                            directorySet.add(objPath.normalize().toString());
-                        }
-
-                        matchCnt++;
-                        RemoteFile remoteFile = new RemoteFile(
-                                fileNameOnly ? 
objPath.getFileName().toString() :
-                                        "s3://" + bucket + "/" + 
objPath.toString(),
-                                !isPrefix,
-                                isPrefix ? -1 : obj.size(),
-                                isPrefix ? -1 : obj.size(),
-                                isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
-                        );
-                        result.add(remoteFile);
-                        objPath = objPath.getParent();
-                        isPrefix = true;
-                    }
-                }
-
-                isTruncated = response.isTruncated();
-                if (isTruncated) {
-                    request = request.toBuilder()
-                            
.continuationToken(response.nextContinuationToken())
-                            .build();
-                }
-            } while (isTruncated);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("remotePath:{}, result:{}", remotePath, result);
-            }
-            return Status.OK;
-        } catch (Exception e) {
-            LOG.warn("Errors while getting file status", e);
-            return new Status(Status.ErrCode.COMMON_ERROR,
-                    "Errors while getting file status " + 
Util.getRootCauseMessage(e));
-        } finally {
-            long endTime = System.nanoTime();
-            long duration = endTime - startTime;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("process {} elements under prefix {} for {} round, 
match {} elements, take {} ms",
-                        elementCnt, remotePath, roundCnt, matchCnt,
-                        duration / 1000 / 1000);
-            }
-        }
+        GlobListResult globListResult = globListInternal(remotePath, result, 
fileNameOnly, null, -1, -1);
+        return globListResult.getStatus();
     }
 
-
     /**
      * List all files under the given path with glob pattern.
      * For example, if the path is "s3://bucket/path/to/*.csv",
@@ -644,14 +545,31 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
      */
     public String globListWithLimit(String remotePath, List<RemoteFile> 
result, String startFile,
             long fileSizeLimit, long fileNumLimit) {
+        GlobListResult globListResult = globListInternal(remotePath, result, 
true, startFile, fileSizeLimit,
+                fileNumLimit);
+        return globListResult.getMaxFile();
+    }
+
+    /**
+     * List all files under the given path with glob pattern.
+     * For example, if the path is "s3://bucket/path/to/*.csv",
+     * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+     * <p>
+     * Copy from `AzureObjStorage.GlobList`
+     */
+    private GlobListResult globListInternal(String remotePath, 
List<RemoteFile> result, boolean fileNameOnly,
+            String startFile, long fileSizeLimit, long fileNumLimit) {
         long roundCnt = 0;
         long elementCnt = 0;
         long matchCnt = 0;
         long matchFileSize = 0L;
         long startTime = System.nanoTime();
+        String currentMaxFile = "";
+        boolean hasLimits = fileSizeLimit > 0 || fileNumLimit > 0;
         try {
             S3URI uri = S3URI.create(remotePath, isUsePathStyle, 
forceParsingByStandardUri);
-            if (uri.useS3DirectoryBucket()) {
+            // Directory bucket check for limit scenario
+            if (hasLimits && uri.useS3DirectoryBucket()) {
                 throw new RuntimeException("Not support glob with limit for 
directory bucket");
             }
 
@@ -670,9 +588,21 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                 LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", 
listPrefix, globPath);
             }
 
-            Builder builder = ListObjectsV2Request.builder();
-            builder.bucket(bucket)
-                    .prefix(listPrefix);
+            // For Directory Buckets, ensure proper prefix handling using 
standardized approach
+            String finalPrefix = listPrefix;
+
+            if (!hasLimits && uri.useS3DirectoryBucket()) {
+                String adjustedPrefix = 
S3URI.getDirectoryPrefixForGlob(listPrefix);
+                if (LOG.isDebugEnabled() && 
!adjustedPrefix.equals(listPrefix)) {
+                    LOG.debug("Directory bucket detected, adjusting prefix 
from '{}' to '{}'",
+                            listPrefix, adjustedPrefix);
+                }
+                finalPrefix = adjustedPrefix;
+            }
+
+            Builder builder = ListObjectsV2Request.builder()
+                    .bucket(bucket)
+                    .prefix(finalPrefix);
 
             if (startFile != null) {
                 builder.startAfter(startFile);
@@ -680,7 +610,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
 
             ListObjectsV2Request request = builder.build();
 
-            String currentMaxFile = "";
             boolean isTruncated = false;
             boolean reachLimit = false;
             do {
@@ -705,7 +634,10 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                         }
 
                         matchCnt++;
-                        RemoteFile remoteFile = new 
RemoteFile(objPath.getFileName().toString(),
+                        matchFileSize += obj.size();
+                        RemoteFile remoteFile = new RemoteFile(
+                                fileNameOnly ? 
objPath.getFileName().toString() :
+                                        "s3://" + bucket + "/" + 
objPath.toString(),
                                 !isPrefix,
                                 isPrefix ? -1 : obj.size(),
                                 isPrefix ? -1 : obj.size(),
@@ -713,10 +645,9 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                         );
                         remoteFile.setBucket(bucket);
                         
remoteFile.setParentPath(objPath.getParent().toString());
-                        matchFileSize += obj.size();
                         result.add(remoteFile);
 
-                        if (reachLimit(result.size(), matchFileSize, 
fileSizeLimit, fileNumLimit)) {
+                        if (hasLimits && reachLimit(result.size(), 
matchFileSize, fileSizeLimit, fileNumLimit)) {
                             reachLimit = true;
                             break;
                         }
@@ -728,8 +659,9 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                         break;
                     }
                 }
+
+                // Record current max file for limit scenario
                 if (!response.contents().isEmpty()) {
-                    //record current last object file name
                     S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
                     currentMaxFile = lastS3Object.key();
                 }
@@ -745,10 +677,11 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("remotePath:{}, result:{}", remotePath, result);
             }
-            return currentMaxFile;
+            return new GlobListResult(Status.OK, currentMaxFile);
         } catch (Exception e) {
             LOG.warn("Errors while getting file status", e);
-            throw new RuntimeException(e);
+            return new GlobListResult(new Status(Status.ErrCode.COMMON_ERROR,
+                    "Errors while getting file status " + 
Util.getRootCauseMessage(e)), "");
         } finally {
             long endTime = System.nanoTime();
             long duration = endTime - startTime;
@@ -782,6 +715,24 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
         return false;
     }
 
+    private static class GlobListResult {
+        private final Status status;
+        private final String maxFile;
+
+        public GlobListResult(Status status, String maxFile) {
+            this.status = status;
+            this.maxFile = maxFile;
+        }
+
+        public Status getStatus() {
+            return status;
+        }
+
+        public String getMaxFile() {
+            return maxFile;
+        }
+    }
+
     @Override
     public synchronized void close() throws Exception {
         if (client != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 24be6358512..a58dddb55e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -247,9 +247,11 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
 
         switch (taskType) {
             case SCHEDULED:
-                return currentJobStatus.equals(JobStatus.RUNNING) || 
currentJobStatus.equals(JobStatus.PENDING);
+                return currentJobStatus.equals(JobStatus.RUNNING);
             case MANUAL:
                 return currentJobStatus.equals(JobStatus.RUNNING) || 
currentJobStatus.equals(JobStatus.PAUSED);
+            case STREAMING:
+                return !jobStatus.equals(JobStatus.STOPPED) && 
!jobStatus.equals(JobStatus.FINISHED);
             default:
                 throw new IllegalArgumentException("Unsupported TaskType: " + 
taskType);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
index 25d207110d9..c1f47f8ff59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
@@ -42,5 +42,10 @@ public enum JobStatus {
     /**
      * When the task is finished, the finished state will be triggered.
      */
-    FINISHED
+    FINISHED;
+
+
+    public static boolean isRunning(JobStatus status) {
+        return PENDING.equals(status) || RUNNING.equals(status);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
index 7f782cb4121..138325a21bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
@@ -20,5 +20,6 @@ package org.apache.doris.job.common;
 public enum TaskType {
 
     SCHEDULED,
-    MANUAL;
+    MANUAL,
+    STREAMING;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 35b1f351f72..23710c17659 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -18,6 +18,7 @@
 package org.apache.doris.job.executor;
 
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.JobUtils;
 import org.apache.doris.job.common.TaskType;
@@ -56,7 +57,9 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 return;
             }
             if (event.getJob().isReadyForScheduling(null) && 
JobUtils.checkNeedSchedule(event.getJob())) {
-                List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
+                TaskType taskType = 
JobExecuteType.STREAMING.equals(event.getJob().getJobConfig().getExecuteType())
+                        ? TaskType.STREAMING : TaskType.SCHEDULED;
+                List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(taskType, null);
                 if (CollectionUtils.isEmpty(tasks)) {
                     log.warn("job is ready for scheduling, but create task is 
empty, skip scheduler,"
                                     + "job id is {}," + " job name is {}", 
event.getJob().getJobId(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 7f882757356..479698b6f44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.base.TimerDefinition;
 import org.apache.doris.job.common.FailureReason;
@@ -88,7 +87,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     protected long latestAutoResumeTimestamp;
     @Getter
     @Setter
-    protected long autoResumeCount = 0L;
+    protected long autoResumeCount;
     @Getter
     @SerializedName("props")
     private Map<String, String> properties;
@@ -218,6 +217,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     protected StreamingInsertTask createStreamingInsertTask() {
+        if (originTvfProps == null) {
+            this.originTvfProps = getCurrentTvf().getProperties().getMap();
+        }
         this.runningStreamTask = new StreamingInsertTask(getJobId(), 
Env.getCurrentEnv().getNextId(), getExecuteSql(),
                 offsetProvider, getCurrentDbName(), jobProperties, 
originTvfProps, getCreateUser());
         
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
@@ -277,9 +279,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
-            if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
-                this.failureReason = new FailureReason(task.getErrMsg());
-            }
+            this.failureReason = new FailureReason(task.getErrMsg());
         } finally {
             writeUnlock();
         }
@@ -420,7 +420,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getLoadBytes(),
                         loadStatistic.getFileNumber(),
                         loadStatistic.getTotalFileSizeB(),
-                        runningStreamTask.getRunningOffset().toJson()));
+                        
runningStreamTask.getRunningOffset().toSerializedJson()));
         } finally {
             if (shouldReleaseLock) {
                 writeUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d5eb39cd064..d1a72b8bd24 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -163,13 +163,13 @@ public class StreamingInsertTask {
                 }
             } catch (Exception e) {
                 log.warn("execute insert task error, label is {},offset is 
{}", taskCommand.getLabelName(),
-                         runningOffset.toJson(), e);
+                         runningOffset.toString(), e);
                 errMsg = Util.getRootCauseMessage(e);
             }
             retry++;
         }
         log.error("streaming insert task failed, job id is {}, task id is {}, 
offset is {}, errMsg is {}",
-                getJobId(), getTaskId(), runningOffset.toJson(), errMsg);
+                getJobId(), getTaskId(), runningOffset.toString(), errMsg);
         throw new JobException(errMsg);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index de46ccedf50..a1c8bce649d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -155,7 +155,7 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
         }
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getRunningOffset() == null ? 
FeConstants.null_string
-                : runningTask.getRunningOffset().toJson()));
+                : runningTask.getRunningOffset().toString()));
         return trow;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 70e5c9f6d0f..8b209cd6e81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -17,8 +17,8 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
-import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -37,6 +37,6 @@ public class StreamingJobStatistic {
     private long fileSize;
 
     public String toJson() {
-        return GsonUtils.GSON.toJson(this);
+        return new Gson().toJson(this);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 34fc29f1c8d..14b12f837e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -259,9 +259,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         for (T a : jobMap.values()) {
             if (a.getJobName().equals(jobName)) {
                 try {
-                    if (jobStatus.equals(a.getJobStatus())) {
-                        throw new JobException("Can't change job status to the 
same status");
-                    }
+                    checkSameStatus(a, jobStatus);
                     alterJobStatus(a.getJobId(), jobStatus);
                 } catch (JobException e) {
                     throw new JobException("Alter job status error, jobName is 
%s, errorMsg is %s",
@@ -271,6 +269,17 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         }
     }
 
+    private void checkSameStatus(T a, JobStatus newStatus) throws JobException 
{
+        if (newStatus.equals(a.getJobStatus())) {
+            throw new JobException("Can't change job status to the same 
status");
+        }
+        if (JobExecuteType.STREAMING.equals(a.getJobConfig().getExecuteType())
+                && a.getJobStatus().equals(JobStatus.RUNNING)
+                && JobStatus.isRunning(newStatus)) {
+            throw new JobException("Can't change job status to the same 
status, job already running");
+        }
+    }
+
     private void checkJobExist(Long jobId) throws JobException {
         if (null == jobMap.get(jobId)) {
             throw new JobException("job not exist, jobId:" + jobId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index a3b0689bfc5..03e664ccd3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -18,9 +18,5 @@
 package org.apache.doris.job.offset;
 
 public interface Offset {
-    String toJson();
-
-    void setEndOffset(String endOffset);
-
-    String endOffset();
+    String toSerializedJson();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 3f7956f9fe9..cb783adfbff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -35,26 +35,12 @@ public class S3Offset implements Offset {
     String fileLists;
 
     @Override
-    public void setEndOffset(String endOffset) {
-        this.endFile = endOffset;
-    }
-
-    @Override
-    public String endOffset() {
-        return endFile;
-    }
-
-    @Override
-    public String toJson() {
+    public String toSerializedJson() {
         return GsonUtils.GSON.toJson(this);
     }
 
     @Override
     public String toString() {
-        return "S3Offset: ["
-                + "startFile=" + startFile
-                + ", endFile=" + endFile
-                + ", fileLists=" + fileLists
-                + "]";
+        return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" + 
endFile + "\" }";
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index af197bfd734..6c8506af7d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4761,7 +4761,7 @@ public class SessionVariable implements Serializable, 
Writable {
                         field.set(this, root.get(attr.name()));
                         break;
                     case "double":
-                        field.set(this, root.get(attr.name()));
+                        field.set(this, 
Double.valueOf(root.get(attr.name()).toString()));
                         break;
                     case "String":
                         field.set(this, root.get(attr.name()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to