This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new cc4c616df09 [Feature](WIP) add StreamingInsertTask and improve
StreamInsertJob (#55862)
cc4c616df09 is described below
commit cc4c616df092be1928f9a93657b590641faf8cd9
Author: wudi <[email protected]>
AuthorDate: Thu Sep 11 09:45:50 2025 +0800
[Feature](WIP) add StreamingInsertTask and improve StreamInsertJob (#55862)
### What problem does this PR solve?
1. add StreamingInsertTask For StreamJob
2. Improve StreamInsertJob
3. add insertcommand rewrite tvf params
---
.../doris/job/extensions/insert/InsertJob.java | 2 +-
.../insert/streaming/StreamingInsertJob.java | 195 +++++++++++++++++---
.../insert/streaming/StreamingInsertTask.java | 196 ++++++++++++++++++++-
.../insert/streaming/StreamingJobProperties.java | 28 ++-
.../streaming/StreamingJobSchedulerTask.java | 33 +++-
.../org/apache/doris/job/manager/JobManager.java | 17 ++
.../doris/job/offset/SourceOffsetProvider.java | 14 +-
.../job/offset/SourceOffsetProviderFactory.java | 19 +-
.../org/apache/doris/job/offset/s3/S3Offset.java | 5 +-
.../job/offset/s3/S3SourceOffsetProvider.java | 35 +++-
.../job/scheduler/StreamingTaskScheduler.java | 11 +-
.../org/apache/doris/job/task/AbstractTask.java | 2 +-
.../trees/plans/commands/AlterJobCommand.java | 1 -
.../trees/plans/commands/CreateJobCommand.java | 12 ++
.../trees/plans/commands/info/CreateJobInfo.java | 6 +-
.../commands/insert/InsertIntoTableCommand.java | 47 +++++
16 files changed, 568 insertions(+), 55 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 5b386886b19..e855aa4f836 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -105,7 +105,7 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
.add(new Column("ErrorMsg", ScalarType.createStringType()))
.build();
- private static final ShowResultSetMetaData TASK_META_DATA =
+ public static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("TaskId",
ScalarType.createVarchar(80)))
.addColumn(new Column("Label",
ScalarType.createVarchar(80)))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index a4bbf51b8a4..c88fd019c46 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -18,46 +18,70 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
+import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.job.offset.SourceOffsetProviderFactory;
+import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.parser.NereidsParser;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.transaction.TransactionException;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TxnStateChangeCallback;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.collections.CollectionUtils;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
-public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> {
+public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask,
Map<Object, Object>> implements
+ TxnStateChangeCallback {
@SerializedName("did")
private final long dbId;
-
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ @SerializedName("fm")
+ private FailMsg failMsg;
@Getter
protected PauseReason pauseReason;
-
@Getter
@Setter
protected long latestAutoResumeTimestamp;
-
@Getter
@Setter
protected long autoResumeCount;
-
@Getter
@SerializedName("jp")
private StreamingJobProperties jobProperties;
-
+ @Getter
+ StreamingInsertTask runningStreamTask;
+ SourceOffsetProvider offsetProvider;
private long lastScheduleTaskTimestamp = -1L;
public StreamingInsertJob(String jobName,
@@ -73,6 +97,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
jobConfig, createTimeMs, executeSql);
this.dbId = ConnectContext.get().getCurrentDbId();
this.jobProperties = jobProperties;
+ String tvfType = parseTvfType();
+ this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
+ }
+
+ private String parseTvfType() {
+ NereidsParser parser = new NereidsParser();
+ InsertIntoTableCommand command = (InsertIntoTableCommand)
parser.parseSingle(getExecuteSql());
+ return command.getFirstTvfName();
}
@Override
@@ -80,35 +112,109 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
super.updateJobStatus(status);
}
- protected void createStreamingInsertTask() {
+ @Override
+ public JobType getJobType() {
+ return JobType.INSERT;
+ }
+
+ @Override
+ protected void checkJobParamsInternal() {
+ }
+
+ @Override
+ public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
+ return CollectionUtils.isEmpty(getRunningTasks());
+ }
+
+ @Override
+ public List<StreamingJobSchedulerTask> createTasks(TaskType taskType,
Map<Object, Object> taskContext) {
+ List<StreamingJobSchedulerTask> newTasks = new ArrayList<>();
+ StreamingJobSchedulerTask streamingJobSchedulerTask = new
StreamingJobSchedulerTask(this);
+ newTasks.add(streamingJobSchedulerTask);
+ super.initTasks(newTasks, taskType);
+ return newTasks;
+ }
+
+ protected StreamingInsertTask createStreamingInsertTask() {
+ InsertIntoTableCommand command =
offsetProvider.rewriteTvfParams(getExecuteSql());
+ this.runningStreamTask = new StreamingInsertTask(getJobId(),
AbstractTask.getNextTaskId(), command,
+ loadStatistic, getCurrentDbName(),
offsetProvider.getCurrentOffset(), jobProperties);
+ return this.runningStreamTask;
}
protected void fetchMeta() {
+ offsetProvider.fetchRemoteMeta();
+ }
+
+ public boolean needScheduleTask() {
+ return (getJobStatus().equals(JobStatus.RUNNING) ||
getJobStatus().equals(JobStatus.PENDING));
+ }
+
+ // When consumer to EOF, delay schedule task appropriately can avoid too
many small transactions.
+ public boolean needDelayScheduleTask() {
+ return System.currentTimeMillis() - lastScheduleTaskTimestamp >
jobProperties.getMaxIntervalSecond() * 1000;
+ }
+
+ public boolean hasMoreDataToConsume() {
+ return offsetProvider.hasMoreDataToConsume();
}
@Override
- public JobType getJobType() {
- return JobType.INSERT;
+ public void onTaskFail(StreamingJobSchedulerTask task) throws JobException
{
+ // Here is the failure of StreamingJobSchedulerTask, no processing is
required
+ getRunningTasks().remove(task);
}
@Override
- protected void checkJobParamsInternal() {
+ public void onTaskSuccess(StreamingJobSchedulerTask task) throws
JobException {
+ // Here is the success of StreamingJobSchedulerTask, no processing is
required
+ getRunningTasks().remove(task);
+ }
+
+ public void onStreamTaskFail(StreamingInsertTask task) throws JobException
{
+ if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+ this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
task.getErrMsg());
+ }
+ updateJobStatus(JobStatus.PAUSED);
}
+ public void onStreamTaskSuccess(StreamingInsertTask task) {
+ StreamingInsertTask nextTask = createStreamingInsertTask();
+ this.runningStreamTask = nextTask;
+
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
+ }
+
+
@Override
- public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
- return true;
+ public ShowResultSetMetaData getTaskMetaData() {
+ return InsertJob.TASK_META_DATA;
}
@Override
- public java.util.List<StreamingJobSchedulerTask>
createTasks(org.apache.doris.job.common.TaskType taskType,
- Map<Object,
Object> taskContext) {
- return java.util.Collections.emptyList();
+ public List<String> getShowInfo() {
+ return getCommonShowInfo();
}
@Override
- public org.apache.doris.qe.ShowResultSetMetaData getTaskMetaData() {
- return org.apache.doris.qe.ShowResultSetMetaData.builder().build();
+ public TRow getTvfInfo() {
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getJobId())));
+ trow.addToColumnValue(new TCell().setStringVal(getJobName()));
+ trow.addToColumnValue(new
TCell().setStringVal(getCreateUser().getQualifiedUser()));
+ trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().getExecuteType().name()));
+ trow.addToColumnValue(new
TCell().setStringVal(getJobConfig().convertRecurringStrategyToString()));
+ trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
+ trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
+ trow.addToColumnValue(new TCell().setStringVal(getComment()));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
+ trow.addToColumnValue(new
TCell().setStringVal(loadStatistic.toJson()));
+ trow.addToColumnValue(new TCell().setStringVal(failMsg == null ?
FeConstants.null_string : failMsg.getMsg()));
+ return trow;
}
@Override
@@ -119,7 +225,11 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@Override
public List<StreamingJobSchedulerTask> queryTasks() {
- return new ArrayList<>();
+ if (!getRunningTasks().isEmpty()) {
+ return getRunningTasks();
+ } else {
+ return Arrays.asList(new StreamingJobSchedulerTask(this));
+ }
}
@Override
@@ -127,17 +237,50 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
- public boolean needScheduleTask() {
- return (getJobStatus().equals(JobStatus.RUNNING) ||
getJobStatus().equals(JobStatus.PENDING));
+ @Override
+ public long getId() {
+ return getJobId();
}
- // When consumer to EOF, delay schedule task appropriately can avoid too
many small transactions.
- public boolean needDelayScheduleTask() {
- return System.currentTimeMillis() - lastScheduleTaskTimestamp >
jobProperties.getMaxIntervalSecond() * 1000;
+ @Override
+ public void beforeCommitted(TransactionState txnState) throws
TransactionException {
+
}
- public boolean hasMoreDataToConsume() {
- // TODO: implement this
- return true;
+ @Override
+ public void beforeAborted(TransactionState txnState) throws
TransactionException {
+
+ }
+
+ @Override
+ public void afterCommitted(TransactionState txnState, boolean txnOperated)
throws UserException {
+
+ }
+
+ @Override
+ public void replayOnCommitted(TransactionState txnState) {
+
+ }
+
+ @Override
+ public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason)
+ throws UserException {
+
+ }
+
+ @Override
+ public void replayOnAborted(TransactionState txnState) {
+
+ }
+
+ @Override
+ public void afterVisible(TransactionState txnState, boolean txnOperated) {
+
+ }
+
+ @Override
+ public void replayOnVisible(TransactionState txnState) {
+
+
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 4e07ee01e30..56a59d3a7f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -17,20 +17,206 @@
package org.apache.doris.job.extensions.insert.streaming;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.job.base.Job;
+import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertTask;
+import org.apache.doris.job.offset.Offset;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TStatusCode;
+
import lombok.Getter;
+import lombok.extern.log4j.Log4j2;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+@Log4j2
+@Getter
public class StreamingInsertTask {
- @Getter
+ private static final String LABEL_SPLITTER = "_";
+ private static final int MAX_RETRY = 3;
private long jobId;
-
- @Getter
private long taskId;
+ private String labelName;
+ private TaskStatus status;
+ private String errMsg;
+ private Long createTimeMs;
+ private Long startTimeMs;
+ private Long finishTimeMs;
+ private InsertIntoTableCommand command;
+ private StmtExecutor stmtExecutor;
+ private String currentDb;
+ private UserIdentity userIdentity;
+ private ConnectContext ctx;
+ private LoadStatistic loadStatistic;
+ private Offset offset;
+ private AtomicBoolean isCanceled = new AtomicBoolean(false);
+ private StreamingJobProperties jobProperties;
- public StreamingInsertTask(long jobId, long taskId) {
+ public StreamingInsertTask(long jobId,
+ long taskId,
+ InsertIntoTableCommand command,
+ LoadStatistic loadStatistic,
+ String currentDb,
+ Offset offset,
+ StreamingJobProperties jobProperties) {
this.jobId = jobId;
this.taskId = taskId;
+ this.command = command;
+ this.loadStatistic = loadStatistic;
+ this.userIdentity = ctx.getCurrentUserIdentity();
+ this.currentDb = currentDb;
+ this.offset = offset;
+ this.jobProperties = jobProperties;
+ this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
+ this.createTimeMs = System.currentTimeMillis();
+ }
+
+ public void execute() throws JobException {
+ try {
+ before();
+ run();
+ onSuccess();
+ } catch (Exception e) {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
+ onFail(e.getMessage());
+ log.warn("execute task error, job id is {}, task id is {}", jobId,
taskId, e);
+ } finally {
+ // The cancel logic will call the closeOrReleased Resources method
by itself.
+ // If it is also called here,
+ // it may result in the inability to obtain relevant information
when canceling the task
+ if (!TaskStatus.CANCELED.equals(status)) {
+ closeOrReleaseResources();
+ }
+ }
+ }
+
+ private void before() throws JobException {
+ this.startTimeMs = System.currentTimeMillis();
+ if (isCanceled.get()) {
+ throw new JobException("Export executor has been canceled, task
id: {}", getTaskId());
+ }
+ ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
+ ctx.setSessionVariable(jobProperties.getSessionVariable());
+ StatementContext statementContext = new StatementContext();
+ ctx.setStatementContext(statementContext);
+ this.command.setLabelName(Optional.of(this.labelName));
+ this.command.setJobId(getTaskId());
+ stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command,
ctx.getStatementContext()));
+ }
+
+ private void run() throws JobException {
+ String errMsg = null;
+ int retry = 0;
+ while (retry <= MAX_RETRY) {
+ try {
+ if (isCanceled.get()) {
+ log.info("task has been canceled, task id is {}",
getTaskId());
+ return;
+ }
+ command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
+ if (ctx.getState().getStateType() ==
QueryState.MysqlStateType.OK) {
+ return;
+ } else {
+ errMsg = ctx.getState().getErrorMessage();
+ }
+ log.error(
+ "streaming insert failed with {}, reason {}, to retry",
+ command.getLabelName(),
+ errMsg);
+ if (retry == MAX_RETRY) {
+ errMsg = "reached max retry times, failed with" + errMsg;
+ }
+ } catch (Exception e) {
+ log.warn("execute insert task error, label is {},offset is
{}", command.getLabelName(),
+ offset.toJson(), e);
+ errMsg = Util.getRootCauseMessage(e);
+ }
+ retry++;
+ }
+ log.error("streaming insert task failed, job id is {}, task id is {},
offset is {}, errMsg is {}",
+ getJobId(), getTaskId(), offset.toJson(), errMsg);
+ throw new JobException(errMsg);
+ }
+
+ public boolean onSuccess() throws JobException {
+ if (TaskStatus.CANCELED.equals(status)) {
+ return false;
+ }
+ this.status = TaskStatus.SUCCESS;
+ this.finishTimeMs = System.currentTimeMillis();
+ if (!isCallable()) {
+ return false;
+ }
+ Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+ if (null == job) {
+ log.info("job is null, job id is {}", jobId);
+ return false;
+ }
+
+ StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
+ streamingInsertJob.onStreamTaskSuccess(this);
+ return true;
+ }
+
+ public void onFail(String errMsg) throws JobException {
+ this.errMsg = errMsg;
+ if (TaskStatus.CANCELED.equals(status)) {
+ return;
+ }
+ this.status = TaskStatus.FAILED;
+ this.finishTimeMs = System.currentTimeMillis();
+ if (!isCallable()) {
+ return;
+ }
+ Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
+ StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
+ streamingInsertJob.onStreamTaskFail(this);
+ }
+
+ public void cancel(boolean needWaitCancelComplete) throws Exception {
+ if (isCanceled.get()) {
+ return;
+ }
+ isCanceled.getAndSet(true);
+ if (null != stmtExecutor) {
+ stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "streaming
insert task cancelled"),
+ needWaitCancelComplete);
+ }
+ }
+
+ public void closeOrReleaseResources() {
+ if (null != stmtExecutor) {
+ stmtExecutor = null;
+ }
+ if (null != command) {
+ command = null;
+ }
+ if (null != ctx) {
+ ctx = null;
+ }
}
- public void execute() {
+ private boolean isCallable() {
+ if (status.equals(TaskStatus.CANCELED)) {
+ return false;
+ }
+ if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
+ return true;
+ }
+ return false;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 25d256b127b..4f463a090d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -20,9 +20,13 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.base.JobProperties;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.SessionVariable;
import lombok.Data;
+import java.util.HashMap;
import java.util.Map;
@Data
@@ -30,13 +34,16 @@ public class StreamingJobProperties implements
JobProperties {
public static final String MAX_INTERVAL_SECOND_PROPERTY = "max_interval";
public static final String S3_BATCH_FILES_PROPERTY = "s3.batch_files";
public static final String S3_BATCH_SIZE_PROPERTY = "s3.batch_size";
+ public static final String SESSION_VAR_PREFIX = "session.";
+
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_S3_BATCH_FILES = 256;
public static final long DEFAULT_S3_BATCH_SIZE = 10 * 1024 * 1024 * 1024L;
// 10GB
- public static final long DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+ public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
private final Map<String, String> properties;
private long maxIntervalSecond;
+ private int maxRetry;
private long s3BatchFiles;
private long s3BatchSize;
@@ -61,4 +68,23 @@ public class StreamingJobProperties implements JobProperties
{
&& v <= (long) (1024 * 1024 * 1024) * 10,
StreamingJobProperties.S3_BATCH_SIZE_PROPERTY + " should
between 100MB and 10GB");
}
+
+ public SessionVariable getSessionVariable() throws JobException {
+ final Map<String, String> sessionVarMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (entry.getKey().startsWith(SESSION_VAR_PREFIX)) {
+ String subKey =
entry.getKey().substring(SESSION_VAR_PREFIX.length());
+ sessionVarMap.put(subKey, entry.getValue());
+ }
+ }
+
+ SessionVariable sessionVariable = new SessionVariable();
+ try {
+ sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+ sessionVariable.readFromJson(GsonUtils.GSON.toJson(sessionVarMap));
+ } catch (Exception e) {
+ throw new JobException("Invalid session variable, " +
e.getMessage());
+ }
+ return sessionVariable;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 888669f428a..d724b09fbf0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -18,17 +18,17 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
public class StreamingJobSchedulerTask extends AbstractTask {
-
private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
-
private StreamingInsertJob streamingInsertJob;
public StreamingJobSchedulerTask(StreamingInsertJob streamingInsertJob) {
@@ -80,14 +80,43 @@ public class StreamingJobSchedulerTask extends AbstractTask
{
@Override
protected void closeOrReleaseResources() {
+ if (streamingInsertJob.getRunningStreamTask() != null) {
+
streamingInsertJob.getRunningStreamTask().closeOrReleaseResources();
+ }
}
@Override
protected void executeCancelLogic(boolean needWaitCancelComplete) throws
Exception {
+ if (streamingInsertJob.getRunningStreamTask() != null) {
+
streamingInsertJob.getRunningStreamTask().cancel(needWaitCancelComplete);
+ }
}
@Override
public TRow getTvfInfo(String jobName) {
+ StreamingInsertTask runningTask =
streamingInsertJob.getRunningStreamTask();
+ TRow trow = new TRow();
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(runningTask.getJobId())));
+ trow.addToColumnValue(new TCell().setStringVal(jobName));
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getLabelName()));
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getStatus().name()));
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getErrMsg()));
+ // create time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getCreateTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ : TimeUtils.longToTimeString(runningTask.getStartTimeMs())));
+ // load end time
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(runningTask.getFinishTimeMs())));
+ // tracking url
+ trow.addToColumnValue(new TCell().setStringVal("trackingUrl"));
+ trow.addToColumnValue(new TCell().setStringVal("statistic"));
+ if (runningTask.getUserIdentity() == null) {
+ trow.addToColumnValue(new TCell().setStringVal(""));
+ } else {
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
+ }
+ trow.addToColumnValue(new
TCell().setStringVal(runningTask.getOffset().toJson()));
return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index ae6bad07066..9954d26b6f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
+import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -73,6 +74,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
private JobScheduler<T, C> jobScheduler;
+ @Getter
private StreamingTaskScheduler streamingTaskScheduler;
// lock for job
@@ -112,6 +114,21 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
return jobMap.get(jobId);
}
+ /**
+ * get streaming running job
+ *
+ * @return running job
+ */
+ public int getStreamingJobCnt() {
+ int count = 0;
+ for (T job : jobMap.values()) {
+ if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
public void registerJob(T job) throws JobException {
job.initParams();
createJobInternal(job, false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index f88079617de..3d620739290 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -36,11 +36,17 @@ public interface SourceOffsetProvider {
Offset getNextOffset();
/**
- * Rewrite the TVF parameters in the InsertIntoTableCommand based on the
current offset.
- * @param command
+ * Get current offset
* @return
*/
- InsertIntoTableCommand rewriteTvfParamsInCommand(InsertIntoTableCommand
command);
+ Offset getCurrentOffset();
+
+ /**
+ * Rewrite the TVF parameters in the SQL based on the current offset.
+ * @param sql
+ * @return rewritten InsertIntoTableCommand
+ */
+ InsertIntoTableCommand rewriteTvfParams(String sql);
/**
* Update the progress of the source.
@@ -57,6 +63,6 @@ public interface SourceOffsetProvider {
* Whether there is more data to consume
* @return
*/
- boolean hasMoreData();
+ boolean hasMoreDataToConsume();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
index 5ba1d903d78..9cefa4e9d42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProviderFactory.java
@@ -17,11 +17,15 @@
package org.apache.doris.job.offset;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.offset.s3.S3SourceOffsetProvider;
+import lombok.extern.log4j.Log4j2;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+@Log4j2
public class SourceOffsetProviderFactory {
private static final Map<String, Class<? extends SourceOffsetProvider>>
map = new ConcurrentHashMap<>();
@@ -29,9 +33,16 @@ public class SourceOffsetProviderFactory {
map.put("s3", S3SourceOffsetProvider.class);
}
- public static SourceOffsetProvider createSourceOffsetProvider(String
sourceType) throws InstantiationException,
- IllegalAccessException {
- Class<? extends SourceOffsetProvider> cla =
map.get(sourceType.toUpperCase());
- return cla.newInstance();
+ public static SourceOffsetProvider createSourceOffsetProvider(String
sourceType) {
+ try {
+ Class<? extends SourceOffsetProvider> cla =
map.get(sourceType.toUpperCase());
+ if (cla == null) {
+ throw new JobException("Unsupported source type: " +
sourceType);
+ }
+ return cla.newInstance();
+ } catch (Exception e) {
+ log.error("Failed to create source provider for type: " +
sourceType, e);
+ throw new RuntimeException("Failed to create source provider for
type: " + sourceType);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 86ff467796a..a175575757f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -20,13 +20,14 @@ package org.apache.doris.job.offset.s3;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.persist.gson.GsonUtils;
+import lombok.Getter;
+
import java.util.List;
public class S3Offset implements Offset {
String startFile;
-
String endFile;
-
+ @Getter
List<String> fileLists;
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index 087d9c2beb7..771736a9559 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -19,35 +19,60 @@ package org.apache.doris.job.offset.s3;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
+import org.apache.doris.nereids.parser.NereidsParser;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import java.util.HashMap;
+import java.util.Map;
+
public class S3SourceOffsetProvider implements SourceOffsetProvider {
+ S3Offset currentOffset;
+ String maxRemoteEndFile;
@Override
public String getSourceType() {
- return null;
+ return "s3";
}
@Override
- public Offset getNextOffset() {
+ public S3Offset getNextOffset() {
+ //todo: listObjects from end file
return null;
}
@Override
- public InsertIntoTableCommand
rewriteTvfParamsInCommand(InsertIntoTableCommand command) {
- return null;
+ public Offset getCurrentOffset() {
+ return currentOffset;
+ }
+
+ @Override
+ public InsertIntoTableCommand rewriteTvfParams(String sql) {
+ S3Offset nextOffset = getNextOffset();
+ Map<String, String> props = new HashMap<>();
+ //todo: need to change file list to glob string
+ props.put("uri", nextOffset.getFileLists().toString());
+
+ NereidsParser parser = new NereidsParser();
+ InsertIntoTableCommand command = (InsertIntoTableCommand)
parser.parseSingle(sql);
+ command.rewriteTvfProperties(getSourceType(), props);
+ return command;
}
@Override
public void updateProgress(Offset offset) {
+ this.currentOffset = (S3Offset) offset;
}
@Override
public void fetchRemoteMeta() {
+ // list object
}
@Override
- public boolean hasMoreData() {
+ public boolean hasMoreDataToConsume() {
+ if (currentOffset.endFile.compareTo(maxRemoteEndFile) < 0) {
+ return true;
+ }
return false;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index d87a2b7833d..bdc27e983e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertTask;
@@ -75,11 +76,17 @@ public class StreamingTaskScheduler extends MasterDaemon {
private void scheduleTasks(List<StreamingInsertTask> tasks) {
for (StreamingInsertTask task : tasks) {
- threadPool.execute(() -> scheduleOneTask(task));
+ threadPool.execute(() -> {
+ try {
+ scheduleOneTask(task);
+ } catch (Exception e) {
+ log.error("Failed to schedule task, task id: {}, job id:
{}", task.getTaskId(), task.getJobId(), e);
+ }
+ });
}
}
- private void scheduleOneTask(StreamingInsertTask task) {
+ private void scheduleOneTask(StreamingInsertTask task) throws JobException
{
StreamingInsertJob job = (StreamingInsertJob)
Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
if (job == null) {
log.warn("Job not found, job id: {}", task.getJobId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 4e2ac653cf7..18c5f525295 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -60,7 +60,7 @@ public abstract class AbstractTask implements Task {
taskId = getNextTaskId();
}
- private static long getNextTaskId() {
+ public static long getNextTaskId() {
// do not use Env.getNextId(), just generate id without logging
return System.nanoTime() + RandomUtils.nextInt();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index ef8c9bb8b7a..53a90893b81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -66,7 +66,6 @@ public class AlterJobCommand extends AlterCommand implements
ForwardWithSync {
@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws
Exception {
-
validate();
AbstractJob job = analyzeAndBuildJobInfo(ctx);
ctx.getEnv().getJobManager().alterJob(job);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
index fecd457ada5..fe81921211f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -19,7 +19,9 @@ package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.exception.JobException;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -56,10 +58,20 @@ public class CreateJobCommand extends Command implements
ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ validate();
AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx);
Env.getCurrentEnv().getJobManager().registerJob(job);
}
+ private void validate() throws JobException {
+ if (createJobInfo.streamingJob()) {
+ int streamingJobCnt =
Env.getCurrentEnv().getJobManager().getStreamingJobCnt();
+ if (streamingJobCnt >= Config.max_streaming_job_num) {
+ throw new JobException("Exceed max streaming job num limit " +
Config.max_streaming_job_num);
+ }
+ }
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCreateJobCommand(this, context);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
index 0d52e23ece5..4334526630e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -274,7 +274,7 @@ public class CreateJobInfo {
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
return new StreamingInsertJob(labelNameOptional.get(),
- JobStatus.RUNNING,
+ JobStatus.PENDING,
currentDbName,
comment,
ConnectContext.get().getCurrentUserIdentity(),
@@ -314,4 +314,8 @@ public class CreateJobInfo {
}
return TimeUtils.timeStringToLong(str.trim());
}
+
+ public boolean streamingJob() {
+ return streamingJob;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 8ffa8884dd9..a5a428c7815 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -38,6 +38,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@@ -79,6 +80,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -536,6 +538,51 @@ public class InsertIntoTableCommand extends Command
implements NeedAuditEncrypti
}
}
+ // todo: add ut
+ public String getFirstTvfName() {
+ return getFirstTvfInPlan(getLogicalQuery());
+ }
+
+ private String getFirstTvfInPlan(LogicalPlan plan) {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
+ return tvfRelation.getFunctionName();
+ }
+
+ for (Plan child : plan.children()) {
+ if (child instanceof LogicalPlan) {
+ String result = getFirstTvfInPlan((LogicalPlan) child);
+ if (!result.isEmpty()) {
+ return result;
+ }
+ }
+ }
+ return "";
+ }
+
+ // todo: add ut
+ public void rewriteTvfProperties(String functionName, Map<String, String>
props) {
+ rewriteTvfInPlan(originLogicalQuery, functionName, props);
+ if (logicalQuery.isPresent()) {
+ rewriteTvfInPlan(logicalQuery.get(), functionName, props);
+ }
+ }
+
+ private void rewriteTvfInPlan(LogicalPlan plan, String functionName,
Map<String, String> props) {
+ if (plan instanceof UnboundTVFRelation) {
+ UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
+ if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) {
+ tvfRelation.getProperties().getMap().putAll(props);
+ }
+ }
+
+ for (Plan child : plan.children()) {
+ if (child instanceof LogicalPlan) {
+ rewriteTvfInPlan((LogicalPlan) child, functionName, props);
+ }
+ }
+ }
+
@Override
public Plan getExplainPlan(ConnectContext ctx) {
Optional<CascadesContext> analyzeContext = Optional.of(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]