This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 777ba161038 [Featuren](WIP) add alter job op and fix bug (#56194)
777ba161038 is described below
commit 777ba161038433f520057142d957181666b9a142
Author: wudi <[email protected]>
AuthorDate: Thu Sep 18 22:31:37 2025 +0800
[Featuren](WIP) add alter job op and fix bug (#56194)
### What problem does this PR solve?
add alter job op and fix bug
---
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 8 +-
.../org/apache/doris/job/base/AbstractJob.java | 5 ++
.../doris/job/extensions/insert/InsertJob.java | 6 +-
.../doris/job/extensions/insert/InsertTask.java | 7 +-
.../insert/streaming/StreamingInsertJob.java | 62 ++++++++++------
.../insert/streaming/StreamingInsertTask.java | 2 +-
.../streaming/StreamingJobSchedulerTask.java | 23 ++++--
.../org/apache/doris/job/manager/JobManager.java | 13 ++++
.../org/apache/doris/job/offset/s3/S3Offset.java | 4 +-
.../job/offset/s3/S3SourceOffsetProvider.java | 10 ++-
.../org/apache/doris/job/task/AbstractTask.java | 2 +-
.../org/apache/doris/journal/JournalEntity.java | 6 ++
.../trees/plans/commands/AlterJobCommand.java | 37 ++++------
.../trees/plans/commands/CreateJobCommand.java | 2 +-
.../trees/plans/commands/ResumeJobCommand.java | 9 ++-
.../persist/AlterStreamingJobOperationLog.java | 86 ++++++++++++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 9 +++
.../org/apache/doris/persist/OperationType.java | 2 +
.../streaming_job/test_streaming_insert_job.groovy | 66 ++++++++++++-----
19 files changed, 276 insertions(+), 83 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 9b0e5aeb801..6f710b74c1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -728,9 +728,11 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
break;
}
}
- //record current last object file name
- S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
- currentMaxFile = lastS3Object.key();
+ if (!response.contents().isEmpty()) {
+ //record current last object file name
+ S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
+ currentMaxFile = lastS3Object.key();
+ }
isTruncated = response.isTruncated();
if (isTruncated) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index d08942460f5..24be6358512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -31,6 +31,7 @@ import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
@@ -475,6 +476,10 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay delete scheduler job").build());
}
+ public void onReplayUpdateStreaming(AlterStreamingJobOperationLog
operationLog) {
+ log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay update streaming job").build());
+ }
+
public boolean needPersist() {
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 7658c3cb54d..cc16cbb603d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("Comment", ScalarType.createStringType()))
// only execute type = streaming need record
.add(new Column("Properties", ScalarType.createStringType()))
- .add(new Column("Progress", ScalarType.createStringType()))
- .add(new Column("RemoteOffset", ScalarType.createStringType()))
+ .add(new Column("ConsumedOffset", ScalarType.createStringType()))
+ .add(new Column("MaxOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
.add(new Column("ErrorMsg", ScalarType.createStringType()))
.build();
@@ -120,9 +120,9 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.addColumn(new Column("TrackingUrl",
ScalarType.createVarchar(200)))
.addColumn(new Column("LoadStatistic",
ScalarType.createVarchar(200)))
.addColumn(new Column("User",
ScalarType.createVarchar(50)))
+ .addColumn(new Column("FirstErrorMsg",
ScalarType.createVarchar(200)))
// only execute type = streaming need record
.addColumn(new Column("Offset",
ScalarType.createStringType()))
- .addColumn(new Column("FirstErrorMsg",
ScalarType.createVarchar(200)))
.build();
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 8ff292ea8c0..5875699fb0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,9 +68,8 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
- new Column("Offset", ScalarType.createStringType()),
- new Column("OtherMsg", ScalarType.createStringType()),
- new Column("FirstErrorMsg", ScalarType.createStringType()));
+ new Column("FirstErrorMsg", ScalarType.createStringType()),
+ new Column("Offset", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -277,8 +276,8 @@ public class InsertTask extends AbstractTask {
} else {
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
}
- trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ?
"" : firstErrorMsg));
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 157629520a9..70463894a66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -39,14 +39,15 @@ import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
-import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -87,10 +88,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
protected long latestAutoResumeTimestamp;
@Getter
@Setter
- protected long autoResumeCount;
+ protected long autoResumeCount = 0L;
@Getter
@SerializedName("props")
- private final Map<String, String> properties;
+ private Map<String, String> properties;
private StreamingJobProperties jobProperties;
@Getter
@SerializedName("tvf")
@@ -114,7 +115,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Long createTimeMs,
String executeSql,
Map<String, String> properties) {
- super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+ super(Env.getCurrentEnv().getNextId(), jobName, jobStatus, dbName,
comment, createUser,
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.properties = properties;
@@ -148,6 +149,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private UnboundTVFRelation getCurrentTvf() {
if (baseCommand == null) {
+ ConnectContext ctx =
InsertTask.makeConnectContext(getCreateUser(), getCurrentDbName());
+ StatementContext statementContext = new StatementContext();
+ ctx.setStatementContext(statementContext);
this.baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(getExecuteSql());
}
List<UnboundTVFRelation> allTVFRelation =
baseCommand.getAllTVFRelation();
@@ -161,6 +165,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
lock.writeLock().lock();
try {
super.updateJobStatus(status);
+ if (JobStatus.PAUSED.equals(getJobStatus())) {
+ clearRunningStreamTask();
+ }
if (isFinalStatus()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getJobId());
}
@@ -193,7 +200,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
protected StreamingInsertTask createStreamingInsertTask() {
- this.runningStreamTask = new StreamingInsertTask(getJobId(),
AbstractTask.getNextTaskId(), getExecuteSql(),
+ this.runningStreamTask = new StreamingInsertTask(getJobId(),
Env.getCurrentEnv().getNextId(), getExecuteSql(),
offsetProvider, getCurrentDbName(), jobProperties,
originTvfProps, getCreateUser());
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(runningStreamTask);
this.runningStreamTask.setStatus(TaskStatus.PENDING);
@@ -215,6 +222,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return (getJobStatus().equals(JobStatus.RUNNING) ||
getJobStatus().equals(JobStatus.PENDING));
}
+ public void clearRunningStreamTask() {
+ if (runningStreamTask != null) {
+ runningStreamTask.closeOrReleaseResources();
+ runningStreamTask = null;
+ }
+ }
+
// When consumer to EOF, delay schedule task appropriately can avoid too
many small transactions.
public boolean needDelayScheduleTask() {
return System.currentTimeMillis() - lastScheduleTaskTimestamp >
jobProperties.getMaxIntervalSecond() * 1000;
@@ -224,6 +238,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
return offsetProvider.hasMoreDataToConsume();
}
+ @Override
+ public void logUpdateOperation() {
+ AlterStreamingJobOperationLog log =
+ new AlterStreamingJobOperationLog(this.getJobId(),
this.getJobStatus(), properties, getExecuteSql());
+ Env.getCurrentEnv().getEditLog().logUpdateStreamingJob(log);
+ }
+
@Override
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException
{
if (task.getErrMsg() != null) {
@@ -287,6 +308,15 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
super.onReplayCreate();
}
+ @Override
+ public void onReplayUpdateStreaming(AlterStreamingJobOperationLog
operationLog) {
+ super.onReplayUpdateStreaming(operationLog);
+ setJobStatus(operationLog.getStatus());
+ this.properties = operationLog.getJobProperties();
+ this.jobProperties = new StreamingJobProperties(properties);
+ setExecuteSql(operationLog.getExecuteSql());
+ }
+
@Override
public ShowResultSetMetaData getTaskMetaData() {
return InsertJob.TASK_META_DATA;
@@ -361,25 +391,15 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeCommitted(TransactionState txnState) throws
TransactionException {
- boolean shouldRealseLock = false;
+ boolean shouldReleaseLock = false;
lock.writeLock().lock();
try {
ArrayList<Long> taskIds = new ArrayList<>();
taskIds.add(runningStreamTask.getTaskId());
- List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
- if (loadJobs.size() != 1) {
- shouldRealseLock = true;
- throw new TransactionException("load job not found, insert job
id is " + runningStreamTask.getTaskId());
- }
- LoadJob loadJob = loadJobs.get(0);
-
- if (txnState.getTransactionId() != loadJob.getTransactionId()
- ||
!runningStreamTask.getStatus().equals(TaskStatus.RUNNING)) {
- shouldRealseLock = true;
- throw new TransactionException("txn " +
txnState.getTransactionId() + "should be aborted.");
- }
+ // todo: Check whether the taskid of runningtask is consistent
with the taskid associated with txn
- LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+ // todo: need get loadStatistic, load manager statistic is empty
+ LoadStatistic loadStatistic = new LoadStatistic();
txnState.setTxnCommitAttachment(new
StreamingTaskTxnCommitAttachment(
getJobId(),
runningStreamTask.getTaskId(),
@@ -389,7 +409,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getTotalFileSizeB(),
runningStreamTask.getRunningOffset().toJson()));
} finally {
- if (shouldRealseLock) {
+ if (shouldReleaseLock) {
lock.writeLock().unlock();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 3073142c151..d5eb39cd064 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -118,7 +118,7 @@ public class StreamingInsertTask {
this.startTimeMs = System.currentTimeMillis();
if (isCanceled.get()) {
- throw new JobException("Export executor has been canceled, task
id: {}", getTaskId());
+ throw new JobException("Streaming insert task has been canceled,
task id: {}", getTaskId());
}
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
ctx.setSessionVariable(jobProperties.getSessionVariable());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index b658c97c828..ec4eb0036d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -29,6 +29,8 @@ import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Arrays;
import java.util.List;
@@ -47,7 +49,6 @@ public class StreamingJobSchedulerTask extends AbstractTask {
case PENDING:
streamingInsertJob.createStreamingInsertTask();
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
- streamingInsertJob.setAutoResumeCount(0);
break;
case RUNNING:
streamingInsertJob.fetchMeta();
@@ -78,6 +79,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
if (autoResumeCount < Long.MAX_VALUE) {
streamingInsertJob.setAutoResumeCount(autoResumeCount + 1);
}
+ streamingInsertJob.createStreamingInsertTask();
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
return;
}
@@ -86,9 +88,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
@Override
protected void closeOrReleaseResources() {
- if (streamingInsertJob.getRunningStreamTask() != null) {
-
streamingInsertJob.getRunningStreamTask().closeOrReleaseResources();
- }
+ // do nothing
}
@Override
@@ -110,7 +110,17 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getLabelName()));
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getStatus().name()));
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getErrMsg()));
+ // err msg
+ String errMsg = "";
+ if (StringUtils.isNotBlank(runningTask.getErrMsg())
+ && !FeConstants.null_string.equals(runningTask.getErrMsg())) {
+ errMsg = runningTask.getErrMsg();
+ } else {
+ errMsg = runningTask.getOtherMsg();
+ }
+ trow.addToColumnValue(new
TCell().setStringVal(StringUtils.isNotBlank(errMsg)
+ ? errMsg : FeConstants.null_string));
+
// create time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
@@ -143,10 +153,9 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
} else {
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
+ trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getRunningOffset() == null ?
FeConstants.null_string
: runningTask.getRunningOffset().toJson()));
- trow.addToColumnValue(new TCell().setStringVal(null ==
runningTask.getOtherMsg()
- ? FeConstants.null_string : runningTask.getOtherMsg()));
return trow;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 2b8c7225b2a..34fc29f1c8d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -45,6 +45,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
@@ -363,6 +364,18 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
.add("msg", "replay update scheduler job").build());
}
+ public void replayUpdateStreamingJob(AlterStreamingJobOperationLog log) {
+ Long jobId = log.getJobId();
+ if (!jobMap.containsKey(jobId)) {
+ LOG.warn("replayUpdateStreamingJob not normal, jobId: {}, jobMap:
{}", jobId, log);
+ return;
+ }
+ T job = jobMap.get(jobId);
+ job.onReplayUpdateStreaming(log);
+ LOG.info(new LogBuilder(LogKey.SCHEDULER_JOB, jobId)
+ .add("msg", "replay update streaming job").build());
+ }
+
/**
* Replay delete load job. we need to remove job from job map
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 3d260218886..3f7956f9fe9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -27,9 +27,11 @@ import lombok.Setter;
@Getter
@Setter
public class S3Offset implements Offset {
+ // path/1.csv
String startFile;
- @SerializedName("ef")
+ @SerializedName("endFile")
String endFile;
+ // s3://bucket/path/{1.csv,2.csv}
String fileLists;
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index e429ae7375b..d433e42f73c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -33,6 +33,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -136,8 +137,13 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
try (RemoteFileSystem fileSystem =
FileSystemFactory.get(storageProperties)) {
String uri = storageProperties.validateAndGetUri(copiedProps);
String filePath = storageProperties.validateAndNormalizeUri(uri);
- maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new
ArrayList<>(), startFile,
- 1, 1);
+ List<RemoteFile> objects = new ArrayList<>();
+ String endFile = fileSystem.globListWithLimit(filePath, objects,
startFile, 1, 1);
+ if (!objects.isEmpty() && StringUtils.isNotEmpty(endFile)) {
+ maxRemoteEndFile = endFile;
+ } else {
+ maxRemoteEndFile = startFile;
+ }
} catch (Exception e) {
throw e;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 18c5f525295..4e2ac653cf7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask implements Task {
taskId = getNextTaskId();
}
- public static long getNextTaskId() {
+ private static long getNextTaskId() {
// do not use Env.getNextId(), just generate id without logging
return System.nanoTime() + RandomUtils.nextInt();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index cb6dee000de..f0f9d471ed9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -67,6 +67,7 @@ import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
+import org.apache.doris.persist.AlterStreamingJobOperationLog;
import org.apache.doris.persist.AlterUserOperationLog;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
@@ -550,6 +551,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_UPDATE_STREAMING_JOB: {
+ data = AlterStreamingJobOperationLog.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_CREATE_SCHEDULER_TASK:
case OperationType.OP_DELETE_SCHEDULER_TASK: {
//todo improve
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 786c8184e40..adde81fbfe9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -33,6 +33,7 @@ import org.apache.doris.qe.StmtExecutor;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
+import java.util.Objects;
/**
* alter job command.
@@ -75,22 +76,16 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync {
private AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws
JobException {
AbstractJob job =
Env.getCurrentEnv().getJobManager().getJobByName(jobName);
if (job instanceof StreamingInsertJob) {
- StreamingInsertJob originJob = (StreamingInsertJob) job;
- String updateSQL = StringUtils.isEmpty(sql) ?
originJob.getExecuteSql() : sql;
- Map<String, String> updateProps =
- properties == null || properties.isEmpty() ?
originJob.getProperties() : properties;
-
- StreamingInsertJob streamingInsertJob = new
StreamingInsertJob(jobName,
- job.getJobStatus(),
- job.getCurrentDbName(),
- job.getComment(),
- ConnectContext.get().getCurrentUserIdentity(),
- originJob.getJobConfig(),
- System.currentTimeMillis(),
- updateSQL,
- updateProps);
- streamingInsertJob.setJobId(job.getJobId());
- return streamingInsertJob;
+ StreamingInsertJob updateJob = (StreamingInsertJob) job;
+ // update sql
+ if (StringUtils.isNotEmpty(sql)) {
+ updateJob.setExecuteSql(sql);
+ }
+ // update properties
+ if (!properties.isEmpty()) {
+ updateJob.getProperties().putAll(properties);
+ }
+ return updateJob;
} else {
throw new JobException("Unsupported job type for ALTER:" +
job.getJobType());
}
@@ -119,20 +114,20 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync {
}
private boolean checkProperties(Map<String, String> originProps) {
- if (originProps.isEmpty()) {
+ if (this.properties == null || this.properties.isEmpty()) {
return false;
}
- if (!originProps.equals(properties)) {
+ if (!Objects.equals(this.properties, originProps)) {
return true;
}
return false;
}
- private boolean checkSql(String sql) {
- if (sql == null || sql.isEmpty()) {
+ private boolean checkSql(String originSql) {
+ if (originSql == null || originSql.isEmpty()) {
return false;
}
- if (!sql.equals(sql)) {
+ if (!originSql.equals(this.sql)) {
return true;
}
return false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index fe81921211f..baac59fd7b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -67,7 +67,7 @@ public class CreateJobCommand extends Command implements
ForwardWithSync {
if (createJobInfo.streamingJob()) {
int streamingJobCnt =
Env.getCurrentEnv().getJobManager().getStreamingJobCnt();
if (streamingJobCnt >= Config.max_streaming_job_num) {
- throw new JobException("Exceed max streaming job num limit " +
Config.max_streaming_job_num);
+ throw new JobException("Exceed max streaming job num limit in
fe.conf:" + Config.max_streaming_job_num);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
index 3935f73f0e9..1ad21237a39 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java
@@ -21,7 +21,9 @@ import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -46,7 +48,12 @@ public class ResumeJobCommand extends AlterJobStatusCommand
implements ForwardWi
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
}
- ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.RUNNING);
+ AbstractJob job =
ctx.getEnv().getJobManager().getJobByName(super.getJobName());
+ if (job instanceof StreamingInsertJob) {
+ ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.PENDING);
+ } else {
+ ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(),
JobStatus.RUNNING);
+ }
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
new file mode 100644
index 00000000000..785531fa0d1
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterStreamingJobOperationLog.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+public class AlterStreamingJobOperationLog implements Writable {
+ @SerializedName(value = "jid")
+ private long jobId;
+ @SerializedName(value = "js")
+ private JobStatus status;
+ @SerializedName(value = "jp")
+ private Map<String, String> jobProperties;
+ @SerializedName(value = "sql")
+ String executeSql;
+
+ public AlterStreamingJobOperationLog(long jobId, JobStatus status,
+ Map<String, String> jobProperties, String executeSql) {
+ this.jobId = jobId;
+ this.status = status;
+ this.jobProperties = jobProperties;
+ this.executeSql = executeSql;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+
+ public Map<String, String> getJobProperties() {
+ return jobProperties;
+ }
+
+ public String getExecuteSql() {
+ return executeSql;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public static AlterStreamingJobOperationLog read(DataInput in) throws
IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json,
AlterStreamingJobOperationLog.class);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ @Override
+ public String toString() {
+ return "AlterStreamingJobOperationLog{"
+ + "jobId=" + jobId
+ + ", status=" + status
+ + ", jobProperties=" + jobProperties
+ + ", executeSql='" + executeSql + '\''
+ + '}';
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 5abcc4e0464..74be0f5e475 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -862,6 +862,11 @@ public class EditLog {
Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
break;
}
+ case OperationType.OP_UPDATE_STREAMING_JOB: {
+ AlterStreamingJobOperationLog log =
(AlterStreamingJobOperationLog) journal.getData();
+
Env.getCurrentEnv().getJobManager().replayUpdateStreamingJob(log);
+ break;
+ }
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
@@ -2038,6 +2043,10 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
+ public void logUpdateStreamingJob(AlterStreamingJobOperationLog log) {
+ logEdit(OperationType.OP_UPDATE_STREAMING_JOB, log);
+ }
+
public void logDeleteJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 2bbc03d4aea..12d59d086b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -414,6 +414,8 @@ public class OperationType {
public static final short OP_OPERATE_KEY = 492;
+ public static final short OP_UPDATE_STREAMING_JOB = 493;
+
// For cloud.
public static final short OP_UPDATE_CLOUD_REPLICA = 1000;
@Deprecated
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 3982876b9f4..d89bc8d2f0f 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -61,15 +61,28 @@ suite("test_streaming_insert_job") {
"s3.secret_key" = "${getS3SK()}"
);
"""
- Awaitility.await().atMost(30, SECONDS)
- .pollInterval(1, SECONDS).until(
- {
- print("check success task count")
- def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
- // check job status and succeed task count larger than 2
- jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
- }
- )
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ print("check success task count")
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ println("show job: " + showjob)
+ println("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobResult = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ println("show success job: " + jobResult)
+
+ qt_select """ SELECT * FROM ${tableName} order by c1 """
sql """
PAUSE JOB where jobname = '${jobName}'
@@ -79,20 +92,20 @@ suite("test_streaming_insert_job") {
"""
assert pausedJobStatus.get(0).get(0) == "PAUSED"
- qt_select """ SELECT * FROM ${tableName} order by c1 """
+ def pauseShowTask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ assert pauseShowTask.size() == 0
+
def jobOffset = sql """
- select progress, remoteoffset from jobs("type"="insert") where
Name='${jobName}'
+ select ConsumedOffset, MaxOffset from jobs("type"="insert") where
Name='${jobName}'
"""
assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
- //todo check status
// alter streaming job
sql """
ALTER JOB FOR ${jobName}
PROPERTIES(
- "s3.batch_files" = "1",
"session.insert_max_filter_ratio" = "0.5"
)
INSERT INTO ${tableName}
@@ -110,10 +123,31 @@ suite("test_streaming_insert_job") {
"""
def alterJobProperties = sql """
- select properties from jobs("type"="insert") where Name='${jobName}'
+ select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
+ """
+ assert alterJobProperties.get(0).get(0) == "PAUSED"
+ assert alterJobProperties.get(0).get(1) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+ assert alterJobProperties.get(0).get(2) ==
"regression/load/data/example_1.csv"
+
+ sql """
+ RESUME JOB where jobname = '${jobName}'
+ """
+ def resumeJobStatus = sql """
+ select status,properties,ConsumedOffset from jobs("type"="insert")
where Name='${jobName}'
"""
- assert alterJobProperties.get(0).get(0) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+ assert resumeJobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
+ assert resumeJobStatus.get(0).get(1) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+ assert resumeJobStatus.get(0).get(2) ==
"regression/load/data/example_1.csv"
+ Awaitility.await().atMost(60, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ print("check create streaming task count")
+ def resumeShowTask = sql """select * from
tasks("type"="insert") where JobName='${jobName}'"""
+ // check streaming task create success
+ resumeShowTask.size() == 1
+ }
+ )
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
@@ -122,6 +156,4 @@ suite("test_streaming_insert_job") {
def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
assert jobCountRsp.get(0).get(0) == 0
-
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]