This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new b2adc933555 [Feature](WIP) Fix Alter Job and schedule bug etc (#56166)
b2adc933555 is described below
commit b2adc933555c4654984cac65c861a0a089b5da4d
Author: wudi <[email protected]>
AuthorDate: Wed Sep 17 22:35:48 2025 +0800
[Feature](WIP) Fix Alter Job and schedule bug etc (#56166)
### What problem does this PR solve?
Fix Alter Job and schedule bug etc
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +-
.../apache/doris/cloud/transaction/TxnUtil.java | 2 +-
.../java/org/apache/doris/fs/obj/S3ObjStorage.java | 7 ++-
.../doris/job/base/JobExecutionConfiguration.java | 17 +-------
.../{PauseReason.java => FailureReason.java} | 8 +++-
.../doris/job/extensions/insert/InsertJob.java | 2 +
.../doris/job/extensions/insert/InsertTask.java | 4 +-
.../insert/streaming/StreamingInsertJob.java | 44 +++++++++++++++----
.../insert/streaming/StreamingInsertTask.java | 4 +-
.../streaming/StreamingJobSchedulerTask.java | 50 ++++++++++++++++-----
.../StreamingTaskTxnCommitAttachment.java | 9 ++--
.../doris/job/offset/SourceOffsetProvider.java | 6 +++
.../org/apache/doris/job/offset/s3/S3Offset.java | 2 +
.../job/offset/s3/S3SourceOffsetProvider.java | 9 +++-
.../job/scheduler/StreamingTaskScheduler.java | 2 +
.../doris/nereids/parser/LogicalPlanBuilder.java | 3 +-
.../trees/plans/commands/AlterJobCommand.java | 4 +-
.../org/apache/doris/persist/gson/GsonUtils.java | 5 ++-
.../org/apache/doris/catalog/DropFunctionTest.java | 11 +----
.../streaming_job/test_streaming_insert_job.groovy | 51 +++++++++++++++++++---
20 files changed, 172 insertions(+), 70 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index a0ae7c6916e..4b6dd3668d4 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -111,7 +111,7 @@ supportedJobStatement
commentSpec?
DO supportedDmlStatement
#createScheduledJob
| PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#pauseJob
- | ALTER JOB FOR (jobNameKey=identifier) (propertyClause |
supportedDmlStatement | propertyClause supportedDmlStatement)
#alterJob
+ | ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause |
supportedDmlStatement | propertyClause supportedDmlStatement)
#alterJob
| DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL)
#dropJob
| RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL)
#resumeJob
| CANCEL TASK WHERE (jobNameKey=identifier) EQ
(jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ
(taskIdValue=INTEGER_VALUE) #cancelJobTask
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 4155e6c5e67..424ce238544 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -289,7 +289,7 @@ public class TxnUtil {
.setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
if (streamingTaskTxnCommitAttachment.getOffset() != null) {
-
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset());
+ builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
}
attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 4c79df0acf7..9b0e5aeb801 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -682,6 +682,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
String currentMaxFile = "";
boolean isTruncated = false;
+ boolean reachLimit = false;
do {
roundCnt++;
ListObjectsV2Response response = listObjectsV2(request);
@@ -716,12 +717,16 @@ public class S3ObjStorage implements ObjStorage<S3Client>
{
result.add(remoteFile);
if (reachLimit(result.size(), matchFileSize,
fileSizeLimit, fileNumLimit)) {
+ reachLimit = true;
break;
}
objPath = objPath.getParent();
isPrefix = true;
}
+ if (reachLimit) {
+ break;
+ }
}
//record current last object file name
S3Object lastS3Object =
response.contents().get(response.contents().size() - 1);
@@ -733,7 +738,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
.continuationToken(response.nextContinuationToken())
.build();
}
- } while (isTruncated);
+ } while (isTruncated && !reachLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 3d8c9afa36b..423b436979f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -124,22 +124,7 @@ public class JobExecutionConfiguration {
return delayTimeSeconds;
}
- if (JobExecuteType.STREAMING.equals(executeType) && null !=
timerDefinition) {
- if (null == timerDefinition.getStartTimeMs() || null !=
timerDefinition.getLatestSchedulerTimeMs()) {
- return delayTimeSeconds;
- }
-
- // If the job is already executed or in the schedule queue, or not
within this schedule window
- if (endTimeMs < timerDefinition.getStartTimeMs()) {
- return delayTimeSeconds;
- }
-
- delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs,
timerDefinition.getStartTimeMs()));
-
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
- return delayTimeSeconds;
- }
-
- if (JobExecuteType.RECURRING.equals(executeType)) {
+ if (JobExecuteType.RECURRING.equals(executeType) ||
JobExecuteType.STREAMING.equals(executeType)) {
if (timerDefinition.getStartTimeMs() > endTimeMs || null !=
timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < startTimeMs) {
return delayTimeSeconds;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
similarity index 91%
rename from
fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java
rename to
fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 49a46327b32..2acfb472a40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/PauseReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -27,17 +27,21 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataOutput;
import java.io.IOException;
-public class PauseReason implements Writable {
+public class FailureReason implements Writable {
@SerializedName(value = "code")
private InternalErrorCode code;
@SerializedName(value = "msg")
private String msg;
- public PauseReason(InternalErrorCode errCode, String msg) {
+ public FailureReason(InternalErrorCode errCode, String msg) {
this.code = errCode;
this.msg = msg;
}
+ public FailureReason(String msg) {
+ this.msg = msg;
+ }
+
public InternalErrorCode getCode() {
return code;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 67b56dbc13d..a4d6793b0b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -99,6 +99,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.addAll(COMMON_SCHEMA)
.add(new Column("Comment", ScalarType.createStringType()))
// only execute type = streaming need record
+ .add(new Column("Properties", ScalarType.createStringType()))
.add(new Column("Progress", ScalarType.createStringType()))
.add(new Column("RemoteOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
@@ -549,6 +550,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(
loadStatistic == null ? FeConstants.null_string :
loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 34cfdf6edea..261ca012311 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
- new Column("Offset", ScalarType.createStringType()));
+ new Column("Offset", ScalarType.createStringType()),
+ new Column("OtherMsg", ScalarType.createStringType()));
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
@@ -295,6 +296,7 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b872040c4c3..157629520a9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -24,7 +24,6 @@ import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
@@ -32,10 +31,10 @@ import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
@@ -72,6 +71,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Log4j2
@@ -80,7 +80,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@Getter
- protected PauseReason pauseReason;
+ @SerializedName("fr")
+ protected FailureReason failureReason;
@Getter
@Setter
protected long latestAutoResumeTimestamp;
@@ -225,6 +226,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException
{
+ if (task.getErrMsg() != null) {
+ this.failureReason = new FailureReason(task.getErrMsg());
+ }
// Here is the failure of StreamingJobSchedulerTask, no processing is
required
getRunningTasks().remove(task);
}
@@ -240,7 +244,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
- this.pauseReason = new
PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
+ this.failureReason = new FailureReason(task.getErrMsg());
}
} finally {
lock.writeLock().unlock();
@@ -262,11 +266,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment
attachment) {
+ if (this.jobStatistic == null) {
+ this.jobStatistic = new StreamingJobStatistic();
+ }
this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() +
attachment.getScannedRows());
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getFileNumber());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileSize());
- offsetProvider.updateOffset(attachment.getOffset());
+
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
}
@Override
@@ -297,7 +304,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
trow.addToColumnValue(new
TCell().setStringVal(getCreateUser().getQualifiedUser()));
trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().getExecuteType().name()));
- trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
@@ -305,6 +312,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
+ trow.addToColumnValue(new TCell().setStringVal(properties != null
+ ? GsonUtils.GSON.toJson(properties) :
FeConstants.null_string));
if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
trow.addToColumnValue(new
TCell().setStringVal(offsetProvider.getSyncOffset()));
@@ -320,8 +329,8 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string :
jobStatistic.toJson()));
- trow.addToColumnValue(
- new TCell().setStringVal(pauseReason == null ?
FeConstants.null_string : pauseReason.getMsg()));
+ trow.addToColumnValue(new TCell().setStringVal(failureReason == null
+ ? FeConstants.null_string : failureReason.getMsg()));
return trow;
}
@@ -359,6 +368,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
taskIds.add(runningStreamTask.getTaskId());
List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
if (loadJobs.size() != 1) {
+ shouldRealseLock = true;
throw new TransactionException("load job not found, insert job
id is " + runningStreamTask.getTaskId());
}
LoadJob loadJob = loadJobs.get(0);
@@ -377,7 +387,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
- runningStreamTask.getRunningOffset()));
+ runningStreamTask.getRunningOffset().toJson()));
} finally {
if (shouldRealseLock) {
lock.writeLock().unlock();
@@ -387,6 +397,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public void beforeAborted(TransactionState txnState) throws
TransactionException {
+
}
@Override
@@ -403,6 +414,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment)
txnState.getTxnCommitAttachment();
updateJobStatisticAndOffset(attachment);
+ succeedTaskCount.incrementAndGet();
}
public void replayOnCloudMode() throws UserException {
@@ -462,5 +474,19 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (jobProperties == null && properties != null) {
jobProperties = new StreamingJobProperties(properties);
}
+
+ if (null == getSucceedTaskCount()) {
+ setSucceedTaskCount(new AtomicLong(0));
+ }
+ if (null == getFailedTaskCount()) {
+ setFailedTaskCount(new AtomicLong(0));
+ }
+ if (null == getCanceledTaskCount()) {
+ setCanceledTaskCount(new AtomicLong(0));
+ }
+
+ if (null == lock) {
+ this.lock = new ReentrantReadWriteLock(true);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index c171173fd3c..3073142c151 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -55,6 +55,8 @@ public class StreamingInsertTask {
@Setter
private TaskStatus status;
private String errMsg;
+ @Setter
+ private String otherMsg;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
@@ -99,8 +101,8 @@ public class StreamingInsertTask {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
- onFail(e.getMessage());
log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
+ onFail(e.getMessage());
} finally {
// The cancel logic will call the closeOrReleased Resources method
by itself.
// If it is also called here,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 6f083a82c55..b658c97c828 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -17,15 +17,21 @@
package org.apache.doris.job.extensions.insert.streaming;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
+import java.util.Arrays;
+import java.util.List;
+
public class StreamingJobSchedulerTask extends AbstractTask {
private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
@@ -55,15 +61,15 @@ public class StreamingJobSchedulerTask extends AbstractTask
{
}
private void autoResumeHandler() throws JobException {
- final PauseReason pauseReason = streamingInsertJob.getPauseReason();
+ final FailureReason failureReason =
streamingInsertJob.getFailureReason();
final long latestAutoResumeTimestamp =
streamingInsertJob.getLatestAutoResumeTimestamp();
final long autoResumeCount = streamingInsertJob.getAutoResumeCount();
final long current = System.currentTimeMillis();
- if (pauseReason != null
- && pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
- && pauseReason.getCode() !=
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
- && pauseReason.getCode() !=
InternalErrorCode.CANNOT_RESUME_ERR) {
+ if (failureReason != null
+ && failureReason.getCode() !=
InternalErrorCode.MANUAL_PAUSE_ERR
+ && failureReason.getCode() !=
InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
+ && failureReason.getCode() !=
InternalErrorCode.CANNOT_RESUME_ERR) {
long autoResumeIntervalTimeSec = autoResumeCount < 5
? Math.min((long) Math.pow(2, autoResumeCount) *
BACK_OFF_BASIC_TIME_SEC,
MAX_BACK_OFF_TIME_SEC) : MAX_BACK_OFF_TIME_SEC;
@@ -107,20 +113,40 @@ public class StreamingJobSchedulerTask extends
AbstractTask {
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getErrMsg()));
// create time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? FeConstants.null_string
: TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
// load end time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
- // tracking url
- trow.addToColumnValue(new TCell().setStringVal("trackingUrl"));
- trow.addToColumnValue(new TCell().setStringVal("statistic"));
+
+ List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager()
+ .queryLoadJobsByJobIds(Arrays.asList(runningTask.getTaskId()));
+ if (!loadJobs.isEmpty()) {
+ LoadJob loadJob = loadJobs.get(0);
+ if (loadJob.getLoadingStatus() != null &&
loadJob.getLoadingStatus().getTrackingUrl() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
+ if (loadJob.getLoadStatistic() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ }
+
if (runningTask.getUserIdentity() == null) {
- trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
} else {
trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
- trow.addToColumnValue(new
TCell().setStringVal(runningTask.getRunningOffset() == null ? ""
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getRunningOffset() == null ?
FeConstants.null_string
: runningTask.getRunningOffset().toJson()));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
runningTask.getOtherMsg()
+ ? FeConstants.null_string : runningTask.getOtherMsg()));
return trow;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 8660ed94739..4b7590824b4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -18,7 +18,6 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
-import org.apache.doris.job.offset.Offset;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;
@@ -28,7 +27,7 @@ import lombok.Getter;
public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {
public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
- long scannedRows, long loadBytes, long fileNumber, long
fileSize, Offset offset) {
+ long scannedRows, long loadBytes, long fileNumber, long
fileSize, String offset) {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
this.jobId = jobId;
this.taskId = taskId;
@@ -45,7 +44,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
this.loadBytes = pb.getLoadBytes();
this.fileNumber = pb.getFileNumber();
this.fileSize = pb.getFileSize();
- this.offset.setEndOffset(pb.getOffset());
+ this.offset = pb.getOffset();
}
@Getter
@@ -66,7 +65,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
private long fileSize;
@SerializedName(value = "of")
@Getter
- private Offset offset;
+ private String offset;
@Override
public String toString() {
@@ -75,7 +74,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
+ ", loadBytes=" + loadBytes
+ ", fileNumber=" + fileNumber
+ ", fileSize=" + fileSize
- + ", offset=" + offset.toString()
+ + ", offset=" + offset
+ "]";
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index e670dac553c..d7e1bee4669 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -81,5 +81,11 @@ public interface SourceOffsetProvider {
*/
boolean hasMoreDataToConsume();
+ /**
+ * Deserialize string offset to Offset
+ * @return
+ */
+ Offset deserializeOffset(String offset);
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 2ab2030fbbb..3d260218886 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -20,6 +20,7 @@ package org.apache.doris.job.offset.s3;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
@@ -27,6 +28,7 @@ import lombok.Setter;
@Setter
public class S3Offset implements Offset {
String startFile;
+ @SerializedName("ef")
String endFile;
String fileLists;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index f63333468fa..e429ae7375b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import lombok.extern.log4j.Log4j2;
@@ -72,8 +73,7 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
String parentPath = rfiles.get(0).getParentPath();
String filePaths =
rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{",
"}"));
String finalFiles = String.format("s3://%s/%s/%s", bucket,
parentPath, filePaths);
- offset.setEndFile(
- String.format("s3://%s/%s/%s", bucket, parentPath,
rfiles.get(rfiles.size() - 1).getName()));
+ offset.setEndFile(String.format("%s/%s", parentPath,
rfiles.get(rfiles.size() - 1).getName()));
offset.setFileLists(finalFiles);
}
} catch (Exception e) {
@@ -153,4 +153,9 @@ public class S3SourceOffsetProvider implements
SourceOffsetProvider {
}
return false;
}
+
+ @Override
+ public Offset deserializeOffset(String offset) {
+ return GsonUtils.GSON.fromJson(offset, S3Offset.class);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 3fbae399303..6e664b404da 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -109,6 +109,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
}
private void scheduleTaskWithDelay(StreamingInsertTask task, long delayMs)
{
+ task.setOtherMsg("No data available for consumption at the moment,
will retry after "
+ + (System.currentTimeMillis() + delayMs));
delayScheduler.schedule(() -> {
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().registerTask(task);
}, delayMs, TimeUnit.MILLISECONDS);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 80a0886017e..a7b67f2ffda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -1163,11 +1163,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan visitAlterJob(DorisParser.AlterJobContext ctx) {
- checkJobNameKey(stripQuotes(ctx.jobNameKey.getText()), JOB_NAME, ctx);
Map<String, String> properties = ctx.propertyClause() != null
? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) :
Maps.newHashMap();
String executeSql = getOriginSql(ctx.supportedDmlStatement());
- return new AlterJobCommand(stripQuotes(ctx.jobNameKey.getText()),
properties, executeSql);
+ return new AlterJobCommand(ctx.jobName.getText(), properties,
executeSql);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index b1823de4a1d..786c8184e40 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -80,7 +80,7 @@ public class AlterJobCommand extends AlterCommand implements
ForwardWithSync {
Map<String, String> updateProps =
properties == null || properties.isEmpty() ?
originJob.getProperties() : properties;
- return new StreamingInsertJob(jobName,
+ StreamingInsertJob streamingInsertJob = new
StreamingInsertJob(jobName,
job.getJobStatus(),
job.getCurrentDbName(),
job.getComment(),
@@ -89,6 +89,8 @@ public class AlterJobCommand extends AlterCommand implements
ForwardWithSync {
System.currentTimeMillis(),
updateSQL,
updateProps);
+ streamingInsertJob.setJobId(job.getJobId());
+ return streamingInsertJob;
} else {
throw new JobException("Unsupported job type for ALTER:" +
job.getJobType());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index af79329f681..dffd7a8c80e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -183,6 +183,7 @@ import org.apache.doris.fs.remote.dfs.JFSFileSystem;
import org.apache.doris.fs.remote.dfs.OFSFileSystem;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.load.loadv2.BrokerLoadJob;
import org.apache.doris.load.loadv2.BulkLoadJob;
@@ -557,7 +558,9 @@ public class GsonUtils {
.registerDefaultSubtype(TxnCommitAttachment.class)
.registerSubtype(LoadJobFinalOperation.class,
LoadJobFinalOperation.class.getSimpleName())
.registerSubtype(MiniLoadTxnCommitAttachment.class,
MiniLoadTxnCommitAttachment.class.getSimpleName())
- .registerSubtype(RLTaskTxnCommitAttachment.class,
RLTaskTxnCommitAttachment.class.getSimpleName());
+ .registerSubtype(RLTaskTxnCommitAttachment.class,
RLTaskTxnCommitAttachment.class.getSimpleName())
+ .registerSubtype(StreamingTaskTxnCommitAttachment.class,
+ StreamingTaskTxnCommitAttachment.class.getSimpleName());
// runtime adapter for class "RoutineLoadProgress".
private static RuntimeTypeAdapterFactory<RoutineLoadProgress>
routineLoadTypeAdapterFactory
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
index b2de57f9110..bef0bb92d20 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DropFunctionTest.java
@@ -23,7 +23,6 @@ import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand;
import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand;
-import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
@@ -36,9 +35,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
@@ -66,15 +63,9 @@ public class DropFunctionTest {
public void testDropGlobalFunction() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
// 1. create database db1
- //String sql = "create database db1;";
- String sql = "insert into db1.tb select * from
s3('url'='s3://a/*.csv')";
+ String sql = "create database db1;";
NereidsParser nereidsParser = new NereidsParser();
LogicalPlan logicalPlan = nereidsParser.parseSingle(sql);
- InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new
NereidsParser().parseSingle(sql);
- baseCommand.initPlan(ConnectContext.get(),
ConnectContext.get().getExecutor(), false);
- Map<String, String> map = new HashMap<>();
- map.put("url", "s3:/xxxx/*.");
-
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
if (logicalPlan instanceof CreateDatabaseCommand) {
((CreateDatabaseCommand) logicalPlan).run(connectContext,
stmtExecutor);
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index bd8ee2759fd..3982876b9f4 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -42,9 +42,13 @@ suite("test_streaming_insert_job") {
"""
- // create recurring job
+ // create streaming job
sql """
- CREATE JOB ${jobName} ON STREAMING DO INSERT INTO ${tableName}
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
SELECT * FROM S3
(
"uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
@@ -57,12 +61,13 @@ suite("test_streaming_insert_job") {
"s3.secret_key" = "${getS3SK()}"
);
"""
- Awaitility.await().atMost(30, SECONDS).until(
+ Awaitility.await().atMost(30, SECONDS)
+ .pollInterval(1, SECONDS).until(
{
print("check success task count")
def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
- // check job status and succeed task count larger than 1
- jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
}
)
@@ -76,6 +81,40 @@ suite("test_streaming_insert_job") {
qt_select """ SELECT * FROM ${tableName} order by c1 """
+ def jobOffset = sql """
+ select progress, remoteoffset from jobs("type"="insert") where
Name='${jobName}'
+ """
+ assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
+ assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
+ //todo check status
+
+ // alter streaming job
+ sql """
+ ALTER JOB FOR ${jobName}
+ PROPERTIES(
+ "s3.batch_files" = "1",
+ "session.insert_max_filter_ratio" = "0.5"
+ )
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ def alterJobProperties = sql """
+ select properties from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert alterJobProperties.get(0).get(0) ==
"{\"s3.batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+
+
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
@@ -83,4 +122,6 @@ suite("test_streaming_insert_job") {
def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
assert jobCountRsp.get(0).get(0) == 0
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]