This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 71a8baca13a [Fix](job) fix job bug (#56225)
71a8baca13a is described below
commit 71a8baca13a7d562d8ab7faf93f137302889ec5f
Author: wudi <[email protected]>
AuthorDate: Fri Sep 19 17:06:37 2025 +0800
[Fix](job) fix job bug (#56225)
### What problem does this PR solve?
[Fix](job) fix job bug
---
.../property/storage/StorageProperties.java | 63 ++++++--
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 175 ++++++++-------------
.../org/apache/doris/job/base/AbstractJob.java | 4 +-
.../org/apache/doris/job/common/JobStatus.java | 7 +-
.../java/org/apache/doris/job/common/TaskType.java | 3 +-
.../doris/job/executor/DispatchTaskHandler.java | 5 +-
.../insert/streaming/StreamingInsertJob.java | 12 +-
.../insert/streaming/StreamingInsertTask.java | 4 +-
.../streaming/StreamingJobSchedulerTask.java | 2 +-
.../insert/streaming/StreamingJobStatistic.java | 4 +-
.../org/apache/doris/job/manager/JobManager.java | 15 +-
.../java/org/apache/doris/job/offset/Offset.java | 6 +-
.../org/apache/doris/job/offset/s3/S3Offset.java | 18 +--
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
14 files changed, 159 insertions(+), 161 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 422d20a7560..337b897f256 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -146,19 +146,66 @@ public abstract class StorageProperties extends
ConnectionProperties {
* @throws RuntimeException if no supported storage type is found
*/
public static StorageProperties createPrimary(Map<String, String>
origProps) {
- for (Function<Map<String, String>, StorageProperties> func :
PROVIDERS) {
- StorageProperties p = func.apply(origProps);
- if (p != null) {
- p.initNormalizeAndCheckProps();
- p.initializeHadoopStorageConfig();
- return p;
+ StorageProperties p = createPrimaryInternal(origProps);
+ if (p == null) {
+ for (Function<Map<String, String>, StorageProperties> func :
PROVIDERS) {
+ p = func.apply(origProps);
+ if (p != null) {
+ break;
+ }
}
}
+ if (p != null) {
+ p.initNormalizeAndCheckProps();
+ p.initializeHadoopStorageConfig();
+ return p;
+ }
throw new StoragePropertiesException("No supported storage type found.
Please check your configuration.");
}
+ private static StorageProperties createPrimaryInternal(Map<String, String>
origProps) {
+ String provider = origProps.get(FS_PROVIDER_KEY);
+ if (provider == null) {
+ return null;
+ }
+
+ try {
+ Type type = Type.valueOf(provider.trim().toUpperCase());
+ switch (type) {
+ case HDFS:
+ return new HdfsProperties(origProps);
+ case OSS_HDFS:
+ return new OSSHdfsProperties(origProps);
+ case S3:
+ return new S3Properties(origProps);
+ case OSS:
+ return new OSSProperties(origProps);
+ case OBS:
+ return new OBSProperties(origProps);
+ case COS:
+ return new COSProperties(origProps);
+ case GCS:
+ return new GCSProperties(origProps);
+ case AZURE:
+ return new AzureProperties(origProps);
+ case MINIO:
+ return new MinioProperties(origProps);
+ case BROKER:
+ return new BrokerProperties(origProps);
+ case LOCAL:
+ return new LocalProperties(origProps);
+ default:
+ return null;
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
private static final List<Function<Map<String, String>,
StorageProperties>> PROVIDERS =
Arrays.asList(
+ props -> (isFsSupport(props, FS_HDFS_SUPPORT)
+ || HdfsProperties.guessIsMe(props)) ? new
HdfsProperties(props) : null,
props -> ((isFsSupport(props, FS_OSS_HDFS_SUPPORT)
|| isFsSupport(props, DEPRECATED_OSS_HDFS_SUPPORT))
|| OSSHdfsProperties.guessIsMe(props)) ? new
OSSHdfsProperties(props) : null,
@@ -179,9 +226,7 @@ public abstract class StorageProperties extends
ConnectionProperties {
props -> (isFsSupport(props, FS_BROKER_SUPPORT)
|| BrokerProperties.guessIsMe(props)) ? new
BrokerProperties(props) : null,
props -> (isFsSupport(props, FS_LOCAL_SUPPORT)
- || LocalProperties.guessIsMe(props)) ? new
LocalProperties(props) : null,
- props -> (isFsSupport(props, FS_HDFS_SUPPORT)
- || HdfsProperties.guessIsMe(props)) ? new
HdfsProperties(props) : null
+ || LocalProperties.guessIsMe(props)) ? new
LocalProperties(props) : null
);
protected StorageProperties(Type type, Map<String, String> origProps) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 6f710b74c1b..f7cc644017f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -529,109 +529,10 @@ public class S3ObjStorage implements
ObjStorage<S3Client> {
* Copy from `AzureObjStorage.GlobList`
*/
public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
- long roundCnt = 0;
- long elementCnt = 0;
- long matchCnt = 0;
- long startTime = System.nanoTime();
- try {
- S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
- String bucket = uri.getBucket();
- String globPath = uri.getKey(); // eg: path/to/*.csv
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("globList globPath:{}, remotePath:{}", globPath,
remotePath);
- }
- java.nio.file.Path pathPattern = Paths.get(globPath);
- PathMatcher matcher =
FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
- HashSet<String> directorySet = new HashSet<>();
-
- String listPrefix = S3Util.getLongestPrefix(globPath); // similar
to Azure
- if (LOG.isDebugEnabled()) {
- LOG.debug("globList listPrefix: '{}' (from globPath: '{}')",
listPrefix, globPath);
- }
-
- // For Directory Buckets, ensure proper prefix handling using
standardized approach
- String finalPrefix = listPrefix;
-
- if (uri.useS3DirectoryBucket()) {
- String adjustedPrefix =
S3URI.getDirectoryPrefixForGlob(listPrefix);
- if (LOG.isDebugEnabled() &&
!adjustedPrefix.equals(listPrefix)) {
- LOG.debug("Directory bucket detected, adjusting prefix
from '{}' to '{}'",
- listPrefix, adjustedPrefix);
- }
- finalPrefix = adjustedPrefix;
- }
-
- ListObjectsV2Request request = ListObjectsV2Request.builder()
- .bucket(bucket)
- .prefix(finalPrefix)
- .build();
-
- boolean isTruncated = false;
- do {
- roundCnt++;
- ListObjectsV2Response response = listObjectsV2(request);
- for (S3Object obj : response.contents()) {
- elementCnt++;
- java.nio.file.Path objPath = Paths.get(obj.key());
-
- boolean isPrefix = false;
- while (objPath != null &&
objPath.normalize().toString().startsWith(listPrefix)) {
- if (!matcher.matches(objPath)) {
- isPrefix = true;
- objPath = objPath.getParent();
- continue;
- }
- if
(directorySet.contains(objPath.normalize().toString())) {
- break;
- }
- if (isPrefix) {
- directorySet.add(objPath.normalize().toString());
- }
-
- matchCnt++;
- RemoteFile remoteFile = new RemoteFile(
- fileNameOnly ?
objPath.getFileName().toString() :
- "s3://" + bucket + "/" +
objPath.toString(),
- !isPrefix,
- isPrefix ? -1 : obj.size(),
- isPrefix ? -1 : obj.size(),
- isPrefix ? 0 :
obj.lastModified().toEpochMilli()
- );
- result.add(remoteFile);
- objPath = objPath.getParent();
- isPrefix = true;
- }
- }
-
- isTruncated = response.isTruncated();
- if (isTruncated) {
- request = request.toBuilder()
-
.continuationToken(response.nextContinuationToken())
- .build();
- }
- } while (isTruncated);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remotePath:{}, result:{}", remotePath, result);
- }
- return Status.OK;
- } catch (Exception e) {
- LOG.warn("Errors while getting file status", e);
- return new Status(Status.ErrCode.COMMON_ERROR,
- "Errors while getting file status " +
Util.getRootCauseMessage(e));
- } finally {
- long endTime = System.nanoTime();
- long duration = endTime - startTime;
- if (LOG.isDebugEnabled()) {
- LOG.debug("process {} elements under prefix {} for {} round,
match {} elements, take {} ms",
- elementCnt, remotePath, roundCnt, matchCnt,
- duration / 1000 / 1000);
- }
- }
+ GlobListResult globListResult = globListInternal(remotePath, result,
fileNameOnly, null, -1, -1);
+ return globListResult.getStatus();
}
-
/**
* List all files under the given path with glob pattern.
* For example, if the path is "s3://bucket/path/to/*.csv",
@@ -644,14 +545,31 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
*/
public String globListWithLimit(String remotePath, List<RemoteFile>
result, String startFile,
long fileSizeLimit, long fileNumLimit) {
+ GlobListResult globListResult = globListInternal(remotePath, result,
true, startFile, fileSizeLimit,
+ fileNumLimit);
+ return globListResult.getMaxFile();
+ }
+
+ /**
+ * List all files under the given path with glob pattern.
+ * For example, if the path is "s3://bucket/path/to/*.csv",
+ * it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
+ * <p>
+ * Copy from `AzureObjStorage.GlobList`
+ */
+ private GlobListResult globListInternal(String remotePath,
List<RemoteFile> result, boolean fileNameOnly,
+ String startFile, long fileSizeLimit, long fileNumLimit) {
long roundCnt = 0;
long elementCnt = 0;
long matchCnt = 0;
long matchFileSize = 0L;
long startTime = System.nanoTime();
+ String currentMaxFile = "";
+ boolean hasLimits = fileSizeLimit > 0 || fileNumLimit > 0;
try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle,
forceParsingByStandardUri);
- if (uri.useS3DirectoryBucket()) {
+ // Directory bucket check for limit scenario
+ if (hasLimits && uri.useS3DirectoryBucket()) {
throw new RuntimeException("Not support glob with limit for
directory bucket");
}
@@ -670,9 +588,21 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
LOG.debug("globList listPrefix: '{}' (from globPath: '{}')",
listPrefix, globPath);
}
- Builder builder = ListObjectsV2Request.builder();
- builder.bucket(bucket)
- .prefix(listPrefix);
+ // For Directory Buckets, ensure proper prefix handling using
standardized approach
+ String finalPrefix = listPrefix;
+
+ if (!hasLimits && uri.useS3DirectoryBucket()) {
+ String adjustedPrefix =
S3URI.getDirectoryPrefixForGlob(listPrefix);
+ if (LOG.isDebugEnabled() &&
!adjustedPrefix.equals(listPrefix)) {
+ LOG.debug("Directory bucket detected, adjusting prefix
from '{}' to '{}'",
+ listPrefix, adjustedPrefix);
+ }
+ finalPrefix = adjustedPrefix;
+ }
+
+ Builder builder = ListObjectsV2Request.builder()
+ .bucket(bucket)
+ .prefix(finalPrefix);
if (startFile != null) {
builder.startAfter(startFile);
@@ -680,7 +610,6 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
ListObjectsV2Request request = builder.build();
- String currentMaxFile = "";
boolean isTruncated = false;
boolean reachLimit = false;
do {
@@ -705,7 +634,10 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
}
matchCnt++;
- RemoteFile remoteFile = new
RemoteFile(objPath.getFileName().toString(),
+ matchFileSize += obj.size();
+ RemoteFile remoteFile = new RemoteFile(
+ fileNameOnly ?
objPath.getFileName().toString() :
+ "s3://" + bucket + "/" +
objPath.toString(),
!isPrefix,
isPrefix ? -1 : obj.size(),
isPrefix ? -1 : obj.size(),
@@ -713,10 +645,9 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
);
remoteFile.setBucket(bucket);
remoteFile.setParentPath(objPath.getParent().toString());
- matchFileSize += obj.size();
result.add(remoteFile);
- if (reachLimit(result.size(), matchFileSize,
fileSizeLimit, fileNumLimit)) {
+ if (hasLimits && reachLimit(result.size(),
matchFileSize, fileSizeLimit, fileNumLimit)) {
reachLimit = true;
break;
}
@@ -728,8 +659,9 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
break;
}
}
+
+ // Record current max file for limit scenario
if (!response.contents().isEmpty()) {
- //record current last object file name
S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
currentMaxFile = lastS3Object.key();
}
@@ -745,10 +677,11 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
- return currentMaxFile;
+ return new GlobListResult(Status.OK, currentMaxFile);
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
- throw new RuntimeException(e);
+ return new GlobListResult(new Status(Status.ErrCode.COMMON_ERROR,
+ "Errors while getting file status " +
Util.getRootCauseMessage(e)), "");
} finally {
long endTime = System.nanoTime();
long duration = endTime - startTime;
@@ -782,6 +715,24 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
return false;
}
+ private static class GlobListResult {
+ private final Status status;
+ private final String maxFile;
+
+ public GlobListResult(Status status, String maxFile) {
+ this.status = status;
+ this.maxFile = maxFile;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public String getMaxFile() {
+ return maxFile;
+ }
+ }
+
@Override
public synchronized void close() throws Exception {
if (client != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 24be6358512..a58dddb55e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -247,9 +247,11 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
switch (taskType) {
case SCHEDULED:
- return currentJobStatus.equals(JobStatus.RUNNING) ||
currentJobStatus.equals(JobStatus.PENDING);
+ return currentJobStatus.equals(JobStatus.RUNNING);
case MANUAL:
return currentJobStatus.equals(JobStatus.RUNNING) ||
currentJobStatus.equals(JobStatus.PAUSED);
+ case STREAMING:
+ return !jobStatus.equals(JobStatus.STOPPED) &&
!jobStatus.equals(JobStatus.FINISHED);
default:
throw new IllegalArgumentException("Unsupported TaskType: " +
taskType);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
index 25d207110d9..c1f47f8ff59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
@@ -42,5 +42,10 @@ public enum JobStatus {
/**
* When the task is finished, the finished state will be triggered.
*/
- FINISHED
+ FINISHED;
+
+
+ public static boolean isRunning(JobStatus status) {
+ return PENDING.equals(status) || RUNNING.equals(status);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
index 7f782cb4121..138325a21bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskType.java
@@ -20,5 +20,6 @@ package org.apache.doris.job.common;
public enum TaskType {
SCHEDULED,
- MANUAL;
+ MANUAL,
+ STREAMING;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 35b1f351f72..23710c17659 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -18,6 +18,7 @@
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.JobUtils;
import org.apache.doris.job.common.TaskType;
@@ -56,7 +57,9 @@ public class DispatchTaskHandler<T extends AbstractJob>
implements WorkHandler<T
return;
}
if (event.getJob().isReadyForScheduling(null) &&
JobUtils.checkNeedSchedule(event.getJob())) {
- List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
+ TaskType taskType =
JobExecuteType.STREAMING.equals(event.getJob().getJobConfig().getExecuteType())
+ ? TaskType.STREAMING : TaskType.SCHEDULED;
+ List<? extends AbstractTask> tasks =
event.getJob().commonCreateTasks(taskType, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is
empty, skip scheduler,"
+ "job id is {}," + " job name is {}",
event.getJob().getJobId(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 7f882757356..479698b6f44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.FailureReason;
@@ -88,7 +87,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
protected long latestAutoResumeTimestamp;
@Getter
@Setter
- protected long autoResumeCount = 0L;
+ protected long autoResumeCount;
@Getter
@SerializedName("props")
private Map<String, String> properties;
@@ -218,6 +217,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
protected StreamingInsertTask createStreamingInsertTask() {
+ if (originTvfProps == null) {
+ this.originTvfProps = getCurrentTvf().getProperties().getMap();
+ }
this.runningStreamTask = new StreamingInsertTask(getJobId(),
Env.getCurrentEnv().getNextId(), getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties,
originTvfProps, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
@@ -277,9 +279,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
- if
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
- this.failureReason = new FailureReason(task.getErrMsg());
- }
+ this.failureReason = new FailureReason(task.getErrMsg());
} finally {
writeUnlock();
}
@@ -420,7 +420,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
- runningStreamTask.getRunningOffset().toJson()));
+
runningStreamTask.getRunningOffset().toSerializedJson()));
} finally {
if (shouldReleaseLock) {
writeUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index d5eb39cd064..d1a72b8bd24 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -163,13 +163,13 @@ public class StreamingInsertTask {
}
} catch (Exception e) {
log.warn("execute insert task error, label is {},offset is
{}", taskCommand.getLabelName(),
- runningOffset.toJson(), e);
+ runningOffset.toString(), e);
errMsg = Util.getRootCauseMessage(e);
}
retry++;
}
log.error("streaming insert task failed, job id is {}, task id is {},
offset is {}, errMsg is {}",
- getJobId(), getTaskId(), runningOffset.toJson(), errMsg);
+ getJobId(), getTaskId(), runningOffset.toString(), errMsg);
throw new JobException(errMsg);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index de46ccedf50..a1c8bce649d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -155,7 +155,7 @@ public class StreamingJobSchedulerTask extends AbstractTask
{
}
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getRunningOffset() == null ?
FeConstants.null_string
- : runningTask.getRunningOffset().toJson()));
+ : runningTask.getRunningOffset().toString()));
return trow;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 70e5c9f6d0f..8b209cd6e81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -17,8 +17,8 @@
package org.apache.doris.job.extensions.insert.streaming;
-import org.apache.doris.persist.gson.GsonUtils;
+import com.google.gson.Gson;
import lombok.Getter;
import lombok.Setter;
@@ -37,6 +37,6 @@ public class StreamingJobStatistic {
private long fileSize;
public String toJson() {
- return GsonUtils.GSON.toJson(this);
+ return new Gson().toJson(this);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 34fc29f1c8d..14b12f837e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -259,9 +259,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
- if (jobStatus.equals(a.getJobStatus())) {
- throw new JobException("Can't change job status to the
same status");
- }
+ checkSameStatus(a, jobStatus);
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
throw new JobException("Alter job status error, jobName is
%s, errorMsg is %s",
@@ -271,6 +269,17 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
}
+ private void checkSameStatus(T a, JobStatus newStatus) throws JobException
{
+ if (newStatus.equals(a.getJobStatus())) {
+ throw new JobException("Can't change job status to the same
status");
+ }
+ if (JobExecuteType.STREAMING.equals(a.getJobConfig().getExecuteType())
+ && a.getJobStatus().equals(JobStatus.RUNNING)
+ && JobStatus.isRunning(newStatus)) {
+ throw new JobException("Can't change job status to the same
status, job already running");
+ }
+ }
+
private void checkJobExist(Long jobId) throws JobException {
if (null == jobMap.get(jobId)) {
throw new JobException("job not exist, jobId:" + jobId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index a3b0689bfc5..03e664ccd3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -18,9 +18,5 @@
package org.apache.doris.job.offset;
public interface Offset {
- String toJson();
-
- void setEndOffset(String endOffset);
-
- String endOffset();
+ String toSerializedJson();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 3f7956f9fe9..cb783adfbff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -35,26 +35,12 @@ public class S3Offset implements Offset {
String fileLists;
@Override
- public void setEndOffset(String endOffset) {
- this.endFile = endOffset;
- }
-
- @Override
- public String endOffset() {
- return endFile;
- }
-
- @Override
- public String toJson() {
+ public String toSerializedJson() {
return GsonUtils.GSON.toJson(this);
}
@Override
public String toString() {
- return "S3Offset: ["
- + "startFile=" + startFile
- + ", endFile=" + endFile
- + ", fileLists=" + fileLists
- + "]";
+ return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" +
endFile + "\" }";
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index af197bfd734..6c8506af7d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4761,7 +4761,7 @@ public class SessionVariable implements Serializable,
Writable {
field.set(this, root.get(attr.name()));
break;
case "double":
- field.set(this, root.get(attr.name()));
+ field.set(this,
Double.valueOf(root.get(attr.name()).toString()));
break;
case "String":
field.set(this, root.get(attr.name()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]