This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 509cfea99ad [feature](Load)(step2)support nereids load job schedule
(#26356)
509cfea99ad is described below
commit 509cfea99ad55fa9d45fe7237fe2cbe5aa9d5a1a
Author: slothever <[email protected]>
AuthorDate: Tue Dec 26 12:29:05 2023 +0800
[feature](Load)(step2)support nereids load job schedule (#26356)
We will Integrate new load job manager into new job scheduling framework
so that the insert into task can be scheduled after the broker load sql is
converted to insert into TVF(table value function) sql.
issue: https://github.com/apache/doris/issues/24221
Now support:
1. load data by tvf insert into sql, but just for simple load(columns need
to be defined in the table)
2. show load stmt
- job id, label name, job state, time info
- simple progress
3. cancel load from db
4. support that enable new load through Config.enable_nereids_load
5. can replay job after restarting doris
TODO:
- support partition insert job
- support show statistics from BE
- support multiple task and collect task statistic
- support transactional task
- need add ut case
---
.../main/java/org/apache/doris/common/Config.java | 12 +
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 15 +-
.../org/apache/doris/analysis/CreateJobStmt.java | 22 +-
.../org/apache/doris/analysis/ShowLoadStmt.java | 7 +
.../main/java/org/apache/doris/catalog/Env.java | 9 +
.../org/apache/doris/job/base/AbstractJob.java | 98 +++-
.../main/java/org/apache/doris/job/base/Job.java | 25 +-
.../org/apache/doris/job/base/JobExecuteType.java | 2 +-
.../org/apache/doris/job/common/JobStatus.java | 1 -
.../java/org/apache/doris/job/common/JobType.java | 2 +-
.../doris/job/extensions/insert/InsertJob.java | 496 ++++++++++++++++++---
.../doris/job/extensions/insert/InsertTask.java | 128 +++---
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 2 +-
.../org/apache/doris/job/manager/JobManager.java | 236 ++++++++--
.../org/apache/doris/job/task/AbstractTask.java | 10 +
.../main/java/org/apache/doris/load/ExportJob.java | 9 +-
.../main/java/org/apache/doris/load/ExportMgr.java | 26 +-
.../org/apache/doris/load/ExportTaskExecutor.java | 6 +
.../java/org/apache/doris/load/loadv2/LoadJob.java | 105 -----
.../org/apache/doris/load/loadv2/LoadManager.java | 8 +-
.../apache/doris/load/loadv2/LoadStatistic.java | 142 ++++++
.../doris/nereids/jobs/load/LabelProcessor.java | 181 ++++++++
.../doris/nereids/parser/LogicalPlanBuilder.java | 14 +-
.../trees/plans/commands/InsertExecutor.java | 13 +-
.../plans/commands/InsertIntoTableCommand.java | 15 +
.../nereids/trees/plans/commands/LoadCommand.java | 51 ++-
.../java/org/apache/doris/persist/EditLog.java | 30 +-
.../main/java/org/apache/doris/qe/DdlExecutor.java | 5 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 17 +-
.../scheduler/executor/TransientTaskExecutor.java | 2 +
.../scheduler/manager/TransientTaskManager.java | 5 +-
.../scheduler/registry/ExportTaskRegister.java | 2 +-
.../suites/job_p0/test_base_insert_job.groovy | 2 +-
33 files changed, 1324 insertions(+), 374 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 04d33fd4cc0..c038a9f1780 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1931,6 +1931,18 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;
+ /**
+ * If set to true, doris will try to parse the ddl of a hive view and try
to execute the query
+ * otherwise it will throw an AnalysisException.
+ */
+ @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL,
description = {
+ "当前默认设置为 false,开启后支持使用新优化器的load语句导入数据,失败后会降级旧的load语句。",
+ "Now default set to true, After this function is enabled, the load
statement of "
+ + "the new optimizer can be used to import data. If this
function fails, "
+ + "the old load statement will be degraded."})
+ public static boolean enable_nereids_load = false;
+
+
/**
* Maximum number of events to poll in each RPC.
*/
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index a2f90ea2166..dc202fef743 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -71,11 +71,6 @@ statement
(withRemoteStorageSystem)?
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)? #load
- | LOAD LABEL lableName=identifier
- LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
- resourceDesc
- (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
- (commentSpec)?
#resourceLoad
| LOAD mysqlDataDesc
(PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
(commentSpec)?
#mysqlLoad
@@ -131,7 +126,7 @@ dataDesc
(PARTITION partition=identifierList)?
(COLUMNS TERMINATED BY comma=STRING_LITERAL)?
(LINES TERMINATED BY separator=STRING_LITERAL)?
- (FORMAT AS format=identifier)?
+ (FORMAT AS format=identifierOrStringLiteral)?
(columns=identifierList)?
(columnsFromPath=colFromPath)?
(columnMapping=colMappingList)?
@@ -167,6 +162,11 @@ refreshMethod
: COMPLETE | AUTO
;
+identifierOrStringLiteral
+ : identifier
+ | STRING_LITERAL
+ ;
+
identifierOrText
: errorCapturingIdentifier
| STRING_LITERAL
@@ -224,7 +224,8 @@ mappingExpr
;
withRemoteStorageSystem
- : WITH S3 LEFT_PAREN
+ : resourceDesc
+ | WITH S3 LEFT_PAREN
brokerProperties=propertyItemList
RIGHT_PAREN
| WITH HDFS LEFT_PAREN
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index ececccc3169..ef76aedba2b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -117,10 +117,8 @@ public class CreateJobStmt extends DdlStmt {
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate
jobInstance
- InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
- job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();
if (null != onceJobStartTimestamp) {
@@ -148,17 +146,19 @@ public class CreateJobStmt extends DdlStmt {
}
checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
- job.setJobConfig(jobExecutionConfiguration);
-
- job.setComment(comment);
- job.setCurrentDbName(labelName.getDbName());
- job.setJobName(labelName.getLabelName());
- job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
- job.setJobStatus(JobStatus.RUNNING);
- job.setJobId(Env.getCurrentEnv().getNextId());
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
- job.setExecuteSql(executeSql);
+ // create job use label name as its job name
+ String jobName = labelName.getLabelName();
+ InsertJob job = new InsertJob(jobName,
+ JobStatus.RUNNING,
+ labelName.getDbName(),
+ comment,
+ ConnectContext.get().getCurrentUserIdentity(),
+ jobExecutionConfiguration,
+ System.currentTimeMillis(),
+ executeSql);
+ //job.checkJobParams();
jobInstance = job;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
index ae8283f6ddf..1e00b5d8897 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
@@ -108,6 +108,13 @@ public class ShowLoadStmt extends ShowStmt {
return states;
}
+ public org.apache.doris.load.loadv2.JobState getStateV2() {
+ if (Strings.isNullOrEmpty(stateValue)) {
+ return null;
+ }
+ return org.apache.doris.load.loadv2.JobState.valueOf(stateValue);
+ }
+
public boolean isAccurateMatch() {
return isAccurateMatch;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 9ae7d4f1617..cd77f70f59c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -182,6 +182,7 @@ import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.jobs.load.LabelProcessor;
import
org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -362,6 +363,7 @@ public class Env {
private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
+ private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
@@ -641,8 +643,11 @@ public class Env {
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
+ this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
+ this.transientTaskManager = new TransientTaskManager();
+
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
@@ -3907,6 +3912,10 @@ public class Env {
return jobManager;
}
+ public LabelProcessor getLabelProcessor() {
+ return labelProcessor;
+ }
+
public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 83f02326d82..6e9cb48da1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -23,6 +23,8 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
@@ -76,14 +78,55 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
private JobExecutionConfiguration jobConfig;
@SerializedName(value = "ctms")
- private Long createTimeMs;
+ private long createTimeMs;
- @SerializedName(value = "sql")
- String executeSql;
+ @SerializedName(value = "stm")
+ private long startTimeMs = -1L;
@SerializedName(value = "ftm")
private long finishTimeMs;
+ @SerializedName(value = "sql")
+ String executeSql;
+
+ public AbstractJob() {}
+
+ public AbstractJob(Long id) {
+ jobId = id;
+ }
+
+ /**
+ * executeSql and runningTasks is not required for load.
+ */
+ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig) {
+ this(jobId, jobName, jobStatus, currentDbName, comment,
+ createUser, jobConfig, System.currentTimeMillis(), null, null);
+ }
+
+ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql,
+ List<T> runningTasks) {
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.jobStatus = jobStatus;
+ this.currentDbName = currentDbName;
+ this.comment = comment;
+ this.createUser = createUser;
+ this.jobConfig = jobConfig;
+ this.createTimeMs = createTimeMs;
+ this.executeSql = executeSql;
+ this.runningTasks = runningTasks;
+ }
+
private List<T> runningTasks = new ArrayList<>();
@Override
@@ -109,6 +152,10 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
.add("Comment")
.build();
+ protected static long getNextJobId() {
+ return System.nanoTime() + RandomUtils.nextInt();
+ }
+
@Override
public void cancelTaskById(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
@@ -154,17 +201,18 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
- if (CollectionUtils.isEmpty(getRunningTasks())) {
- setRunningTasks(new ArrayList<>());
- }
- getRunningTasks().addAll((Collection<? extends T>) tasks);
+ getRunningTasks().addAll(tasks);
+ this.startTimeMs = System.currentTimeMillis();
}
public void checkJobParams() {
@@ -208,10 +256,22 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
- job.setRunningTasks(new ArrayList<>());
+ job.runningTasks = new ArrayList<>();
return job;
}
+ public void logCreateOperation() {
+ Env.getCurrentEnv().getEditLog().logCreateJob(this);
+ }
+
+ public void logFinalOperation() {
+ Env.getCurrentEnv().getEditLog().logEndJob(this);
+ }
+
+ public void logUpdateOperation() {
+ Env.getCurrentEnv().getEditLog().logUpdateJob(this);
+ }
+
@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
@@ -303,7 +363,19 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
return builder.build();
}
- private static long getNextId() {
- return System.nanoTime() + RandomUtils.nextInt();
+ @Override
+ public void onRegister() throws JobException {}
+
+ @Override
+ public void onUnRegister() throws JobException {}
+
+ @Override
+ public void onReplayCreate() throws JobException {
+ log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay create scheduler job").build());
+ }
+
+ @Override
+ public void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException {
+ log.info(new LogBuilder(LogKey.SCHEDULER_JOB, getJobId()).add("msg",
"replay delete scheduler job").build());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index ee352a0f417..1124e7f2d28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
/**
* Cancels all running tasks of this job.
- *
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
+ /**
+ * register job
+ * @throws JobException If register job failed.
+ */
+ void onRegister() throws JobException;
+
+ /**
+ * register job failed
+ * @throws JobException If failed.
+ */
+ void onUnRegister() throws JobException;
+
+ /**
+ * replay create job
+ * @throws JobException If replay create failed.
+ */
+ void onReplayCreate() throws JobException;
+
+ /**
+ * replay finished or cancelled job
+ * @throws JobException If replay end failed.
+ */
+ void onReplayEnd(AbstractJob<?, C> replayJob) throws JobException;
+
/**
* Notifies the job when a task execution fails.
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java
index ea9ddb3b020..3529a2efeff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecuteType.java
@@ -37,7 +37,7 @@ public enum JobExecuteType {
*/
MANUAL,
/**
- * The job will be executed immediately.
+ * The job will be executed only once and immediately.
*/
INSTANT,
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
index 2df65e4654d..22b799225d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java
@@ -36,7 +36,6 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,
-
/**
* When the task is finished, the finished state will be triggered.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java
index 1beb4e0a384..084a39ddf09 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java
@@ -19,5 +19,5 @@ package org.apache.doris.job.common;
public enum JobType {
INSERT,
- MV
+ MV,
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 74581b8f1b0..9256864efca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -17,43 +17,74 @@
package org.apache.doris.job.extensions.insert;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Config;
-import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
-import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.ErrorTabletInfo;
+import org.apache.doris.transaction.TabletCommitInfo;
+import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
-public class InsertJob extends AbstractJob<InsertTask, Map> {
+public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@@ -66,40 +97,145 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId",
ScalarType.createVarchar(80)))
+ .addColumn(new Column("Label",
ScalarType.createVarchar(80)))
+ .addColumn(new Column("Status",
ScalarType.createVarchar(20)))
+ .addColumn(new Column("EtlInfo",
ScalarType.createVarchar(100)))
+ .addColumn(new Column("TaskInfo",
ScalarType.createVarchar(100)))
+ .addColumn(new Column("ErrorMsg",
ScalarType.createVarchar(100)))
+
+ .addColumn(new Column("CreateTimeMs",
ScalarType.createVarchar(20)))
+ .addColumn(new Column("FinishTimeMs",
ScalarType.createVarchar(20)))
+ .addColumn(new Column("TrackingUrl",
ScalarType.createVarchar(200)))
+ .addColumn(new Column("LoadStatistic",
ScalarType.createVarchar(200)))
+ .addColumn(new Column("User",
ScalarType.createVarchar(50)))
+ .build();
+
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
- ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder();
+ ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("tis")
+ ConcurrentLinkedQueue<Long> historyTaskIdList;
+ @SerializedName("did")
+ private final long dbId;
+ @SerializedName("ln")
+ private String labelName;
+ @SerializedName("lt")
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ @SerializedName("pg")
+ private int progress;
+ @SerializedName("fm")
+ private FailMsg failMsg;
+ @SerializedName("plans")
+ private List<InsertIntoTableCommand> plans = new ArrayList<>();
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new
ConcurrentHashMap<>();
+ private Map<String, String> properties = new HashMap<>();
+ private Set<String> tableNames;
+ private AuthorizationInfo authorizationInfo;
- InsertIntoTableCommand command;
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
- StmtExecutor stmtExecutor;
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
- ConnectContext ctx;
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> historyTaskIdList;
+ }
+
+ public enum Priority {
+ HIGH(0),
+ NORMAL(1),
+ LOW(2);
+
+ Priority(int value) {
+ this.value = value;
+ }
+
+ private final int value;
+
+ public int getValue() {
+ return value;
+ }
+ }
+
+ public InsertJob(String jobName,
+ JobStatus jobStatus,
+ String dbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(getNextJobId(), jobName, jobStatus, dbName, comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ }
+
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(getNextJobId(), labelName, JobStatus.RUNNING, null,
+ comment, ctx.getCurrentUserIdentity(), jobConfig);
+ this.ctx = ctx;
+ this.plans = plans;
+ this.stmtExecutor = executor;
+ this.dbId = ctx.getCurrentDbId();
+ this.labelName = labelName;
+ this.tableNames = sinkTableNames;
+ this.properties = properties;
+ // TODO: not support other type yet
+ this.loadType = InsertJob.LoadType.BULK;
+ }
@Override
- public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
- //nothing need to do in insert job
- InsertTask task = new InsertTask(null, getCurrentDbName(),
getExecuteSql(), getCreateUser());
- task.setJobId(getJobId());
- task.setTaskType(taskType);
- task.setTaskId(Env.getCurrentEnv().getNextId());
- ArrayList<InsertTask> tasks = new ArrayList<>();
- tasks.add(task);
- super.initTasks(tasks);
- recordTask(task.getTaskId());
- return tasks;
+ public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object>
taskContext) {
+ if (plans.isEmpty()) {
+ InsertTask task = new InsertTask(labelName, getCurrentDbName(),
getExecuteSql(), getCreateUser());
+ idToTasks.put(task.getTaskId(), task);
+ recordTask(task.getTaskId());
+ } else {
+ // use for load stmt
+ for (InsertIntoTableCommand logicalPlan : plans) {
+ if (!logicalPlan.getLabelName().isPresent()) {
+ throw new IllegalArgumentException("Load plan need label
name.");
+ }
+ InsertTask task = new InsertTask(logicalPlan, ctx,
stmtExecutor, loadStatistic);
+ idToTasks.put(task.getTaskId(), task);
+ recordTask(task.getTaskId());
+ }
+ }
+ initTasks(idToTasks.values(), taskType);
+ return new ArrayList<>(idToTasks.values());
}
public void recordTask(long id) {
@@ -116,7 +252,6 @@ public class InsertJob extends AbstractJob<InsertTask, Map>
{
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
- Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
@@ -125,23 +260,27 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
}
@Override
- public boolean isReadyForScheduling(Map taskContext) {
- return CollectionUtils.isEmpty(getRunningTasks());
+ public void cancelAllTasks() throws JobException {
+ try {
+ checkAuth("CANCEL LOAD");
+ super.cancelAllTasks();
+ this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user
cancel");
+ } catch (DdlException e) {
+ throw new JobException(e);
+ }
}
-
@Override
- public void cancelAllTasks() throws JobException {
- super.cancelAllTasks();
+ public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
+ return CollectionUtils.isEmpty(getRunningTasks());
}
-
@Override
protected void checkJobParamsInternal() {
- if (command == null && StringUtils.isBlank(getExecuteSql())) {
+ if (plans.isEmpty() && StringUtils.isBlank(getExecuteSql())) {
throw new IllegalArgumentException("command or sql is null,must be
set");
}
- if (null != command &&
!getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+ if (!plans.isEmpty() &&
!getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
throw new IllegalArgumentException("command must be null when
executeType is not instant");
}
}
@@ -153,27 +292,22 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
}
//TODO it's will be refactor, we will storage task info in job inner
and query from it
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
+
Collections.reverse(taskIdList);
- List<LoadJob> loadJobs =
Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
- if (CollectionUtils.isEmpty(loadJobs)) {
+ return queryLoadTasksByTaskIds(taskIdList);
+ }
+
+ public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
+ if (taskIdList.isEmpty()) {
return new ArrayList<>();
}
- List<InsertTask> tasks = new ArrayList<>();
- loadJobs.forEach(loadJob -> {
- InsertTask task;
- try {
- task = new InsertTask(loadJob.getLabel(),
loadJob.getDb().getFullName(), null, getCreateUser());
- task.setCreateTimeMs(loadJob.getCreateTimestamp());
- } catch (MetaNotFoundException e) {
- log.warn("load job not found, job id is {}", loadJob.getId());
- return;
+ List<InsertTask> jobs = new ArrayList<>();
+ taskIdList.forEach(id -> {
+ if (null != idToTasks.get(id)) {
+ jobs.add(idToTasks.get(id));
}
- task.setJobId(getJobId());
- task.setTaskId(loadJob.getId());
- task.setLoadJob(loadJob);
- tasks.add(task);
});
- return tasks;
+ return jobs;
}
@Override
@@ -193,6 +327,12 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
@Override
public void onTaskFail(InsertTask task) {
+ try {
+ updateJobStatus(JobStatus.STOPPED);
+ this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL,
task.getErrMsg());
+ } catch (JobException e) {
+ throw new RuntimeException(e);
+ }
getRunningTasks().remove(task);
}
@@ -203,7 +343,129 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
@Override
public List<String> getShowInfo() {
- return super.getCommonShowInfo();
+ try {
+ // check auth
+ checkAuth("SHOW LOAD");
+ List<String> jobInfo = Lists.newArrayList();
+ // jobId
+ jobInfo.add(getJobId().toString());
+ // label
+ if (StringUtils.isEmpty(getLabelName())) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(getLabelName());
+ }
+ // state
+ if (getJobStatus() == JobStatus.STOPPED) {
+ jobInfo.add("CANCELLED");
+ } else {
+ jobInfo.add(getJobStatus().name());
+ }
+
+ // progress
+ String progress =
Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId()));
+ switch (getJobStatus()) {
+ case RUNNING:
+ if (isPending()) {
+ jobInfo.add("ETL:0%; LOAD:0%");
+ } else {
+ jobInfo.add("ETL:100%; LOAD:" + progress + "%");
+ }
+ break;
+ case FINISHED:
+ jobInfo.add("ETL:100%; LOAD:100%");
+ break;
+ case STOPPED:
+ default:
+ jobInfo.add("ETL:N/A; LOAD:N/A");
+ break;
+ }
+ // type
+ jobInfo.add(loadType.name());
+
+ // etl info
+ if (loadStatistic.getCounters().size() == 0) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(Joiner.on(";
").withKeyValueSeparator("=").join(loadStatistic.getCounters()));
+ }
+
+ // task info
+ jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" +
getTimeout()
+ + "; max_filter_ratio:" + getMaxFilterRatio() + ";
priority:" + getPriority());
+ // error msg
+ if (failMsg == null) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" +
failMsg.getMsg());
+ }
+
+ // create time
+ jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+ // etl start time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // etl end time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // load start time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // load end time
+ jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
+ // tracking urls
+ List<String> trackingUrl = idToTasks.values().stream()
+ .map(task -> {
+ if (StringUtils.isNotEmpty(task.getTrackingUrl())) {
+ return task.getTrackingUrl();
+ } else {
+ return FeConstants.null_string;
+ }
+ })
+ .collect(Collectors.toList());
+ if (trackingUrl.isEmpty()) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(trackingUrl.toString());
+ }
+ // job details
+ jobInfo.add(loadStatistic.toJson());
+ // transaction id
+ jobInfo.add(String.valueOf(0));
+ // error tablets
+ jobInfo.add(errorTabletsToJson());
+ // user, some load job may not have user info
+ if (getCreateUser() == null || getCreateUser().getQualifiedUser()
== null) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(getCreateUser().getQualifiedUser());
+ }
+ // comment
+ jobInfo.add(getComment());
+ return jobInfo;
+ } catch (DdlException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getPriority() {
+ return properties.getOrDefault(LoadStmt.PRIORITY,
Priority.NORMAL.name());
+ }
+
+ public double getMaxFilterRatio() {
+ return
Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY,
"0.0"));
+ }
+
+ public long getTimeout() {
+ if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
+ return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
+ }
+ return Config.broker_load_default_timeout_second;
+ }
+
+
+ public static InsertJob readFields(DataInput in) throws IOException {
+ String jsonJob = Text.readString(in);
+ InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
+ job.setRunningTasks(new ArrayList<>());
+ return job;
}
@Override
@@ -211,19 +473,129 @@ public class InsertJob extends AbstractJob<InsertTask,
Map> {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
- private static final ShowResultSetMetaData TASK_META_DATA =
- ShowResultSetMetaData.builder()
- .addColumn(new Column("TaskId",
ScalarType.createVarchar(20)))
- .addColumn(new Column("Label",
ScalarType.createVarchar(20)))
- .addColumn(new Column("Status",
ScalarType.createVarchar(20)))
- .addColumn(new Column("EtlInfo",
ScalarType.createVarchar(20)))
- .addColumn(new Column("TaskInfo",
ScalarType.createVarchar(20)))
- .addColumn(new Column("ErrorMsg",
ScalarType.createVarchar(20)))
+ public String errorTabletsToJson() {
+ Map<Long, String> map = new HashMap<>();
+ errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load)
+ .forEach(p -> map.put(p.getTabletId(), p.getMsg()));
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(map);
+ }
- .addColumn(new Column("CreateTimeMs",
ScalarType.createVarchar(20)))
- .addColumn(new Column("FinishTimeMs",
ScalarType.createVarchar(20)))
- .addColumn(new Column("TrackingUrl",
ScalarType.createVarchar(20)))
- .addColumn(new Column("LoadStatistic",
ScalarType.createVarchar(20)))
- .addColumn(new Column("User",
ScalarType.createVarchar(20)))
- .build();
+ public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId
fragmentId, long scannedRows,
+ long scannedBytes, boolean isDone) {
+ loadStatistic.updateLoadProgress(beId, loadId, fragmentId,
scannedRows, scannedBytes, isDone);
+ progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() *
100);
+ if (progress == 100) {
+ progress = 99;
+ }
+ }
+
+ private void checkAuth(String command) throws DdlException {
+ if (authorizationInfo == null) {
+ // use the old method to check priv
+ checkAuthWithoutAuthInfo(command);
+ return;
+ }
+ if
(!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(),
authorizationInfo,
+ PrivPredicate.LOAD)) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+ Privilege.LOAD_PRIV);
+ }
+ }
+
+ /**
+ * This method is compatible with old load job without authorization info
+ * If db or table name could not be found by id, it will throw the
NOT_EXISTS_ERROR
+ *
+ * @throws DdlException
+ */
+ private void checkAuthWithoutAuthInfo(String command) throws DdlException {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+ // check auth
+ if (tableNames == null || tableNames.isEmpty()) {
+ // forward compatibility
+ if
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
db.getFullName(),
+ PrivPredicate.LOAD)) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+ Privilege.LOAD_PRIV);
+ }
+ } else {
+ for (String tblName : tableNames) {
+ if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
db.getFullName(),
+ tblName, PrivPredicate.LOAD)) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+ command,
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(),
db.getFullName() + ": " + tblName);
+ }
+ }
+ }
+ }
+
+ public void unprotectReadEndOperation(InsertJob replayLog) {
+ setJobStatus(replayLog.getJobStatus());
+ progress = replayLog.getProgress();
+ setStartTimeMs(replayLog.getStartTimeMs());
+ setFinishTimeMs(replayLog.getFinishTimeMs());
+ failMsg = replayLog.getFailMsg();
+ }
+
+ public String getResourceName() {
+ // TODO: get tvf param from tvf relation
+ return "N/A";
+ }
+
+ public boolean isRunning() {
+ return getJobStatus() != JobStatus.FINISHED;
+ }
+
+ public boolean isPending() {
+ return getJobStatus() != JobStatus.FINISHED;
+ }
+
+ public boolean isCancelled() {
+ return getJobStatus() == JobStatus.STOPPED;
+ }
+
+ @Override
+ public void onRegister() throws JobException {
+ try {
+ if (StringUtils.isNotEmpty(labelName)) {
+ Env.getCurrentEnv().getLabelProcessor().addJob(this);
+ }
+ } catch (LabelAlreadyUsedException e) {
+ throw new JobException(e);
+ }
+ }
+
+ @Override
+ public void onUnRegister() throws JobException {
+ // TODO: need record cancelled jobs in order to show cancelled job
+ // Env.getCurrentEnv().getLabelProcessor().removeJob(getDbId(),
getLabelName());
+ }
+
+ @Override
+ public void onReplayCreate() throws JobException {
+ JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+ jobConfig.setExecuteType(JobExecuteType.INSTANT);
+ setJobConfig(jobConfig);
+ onRegister();
+ checkJobParams();
+ log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg",
"replay create load job").build());
+ }
+
+ @Override
+ public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob)
throws JobException {
+ if (!(replayJob instanceof InsertJob)) {
+ return;
+ }
+ InsertJob insertJob = (InsertJob) replayJob;
+ unprotectReadEndOperation(insertJob);
+ log.info(new LogBuilder(LogKey.LOAD_JOB,
+ insertJob.getJobId()).add("operation", insertJob).add("msg",
"replay end load job").build());
+ }
+
+ public int getProgress() {
+ return progress;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 7e988950267..e85e7a1b027 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -25,7 +25,8 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.qe.ConnectContext;
@@ -34,12 +35,12 @@ import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
import java.util.UUID;
@@ -53,8 +54,6 @@ public class InsertTask extends AbstractTask {
new Column("JobId", ScalarType.createStringType()),
new Column("Label", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
- new Column("EtlInfo", ScalarType.createStringType()),
- new Column("TaskInfo", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTimeMs", ScalarType.createStringType()),
new Column("FinishTimeMs", ScalarType.createStringType()),
@@ -73,30 +72,69 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
-
private InsertIntoTableCommand command;
-
private StmtExecutor stmtExecutor;
-
private ConnectContext ctx;
-
private String sql;
-
private String currentDb;
-
private UserIdentity userIdentity;
-
+ private LoadStatistic loadStatistic;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
private AtomicBoolean isFinished = new AtomicBoolean(false);
-
private static final String LABEL_SPLITTER = "_";
+ private FailMsg failMsg;
+ @Getter
+ private String trackingUrl;
@Getter
@Setter
- private LoadJob loadJob;
+ private InsertJob jobInfo;
+ private TaskType taskType = TaskType.PENDING;
+ private MergeType mergeType = MergeType.APPEND;
+
+ /**
+ * task merge type
+ */
+ enum MergeType {
+ MERGE,
+ APPEND,
+ DELETE
+ }
+
+ /**
+ * task type
+ */
+ enum TaskType {
+ UNKNOWN, // this is only for ISSUE #2354
+ PENDING,
+ LOADING,
+ FINISHED,
+ FAILED,
+ CANCELLED
+ }
+
+ public InsertTask(InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic
statistic) {
+ this(insertInto.getLabelName().get(), insertInto, ctx, executor,
statistic);
+ }
+
+ public InsertTask(String labelName, String currentDb, String sql,
UserIdentity userIdentity) {
+ this.labelName = labelName;
+ this.sql = sql;
+ this.currentDb = currentDb;
+ this.userIdentity = userIdentity;
+ }
+ public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor,
LoadStatistic statistic) {
+ this.labelName = labelName;
+ this.command = insertInto;
+ this.userIdentity = ctx.getCurrentUserIdentity();
+ this.ctx = ctx;
+ this.stmtExecutor = executor;
+ this.loadStatistic = statistic;
+ }
@Override
public void before() throws JobException {
@@ -109,15 +147,19 @@ public class InsertTask extends AbstractTask {
ctx.setCurrentUserIdentity(userIdentity);
ctx.getState().reset();
ctx.setThreadLocalInfo();
- ctx.setDatabase(currentDb);
+ if (StringUtils.isNotEmpty(currentDb)) {
+ ctx.setDatabase(currentDb);
+ }
TUniqueId queryId = generateQueryId(UUID.randomUUID().toString());
ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
stmtExecutor = new StmtExecutor(ctx, (String) null);
ctx.setQueryId(queryId);
- NereidsParser parser = new NereidsParser();
- this.command = (InsertIntoTableCommand) parser.parseSingle(sql);
- this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER +
getTaskId()));
- this.command.setJobId(getTaskId());
+ if (StringUtils.isNotEmpty(sql)) {
+ NereidsParser parser = new NereidsParser();
+ this.command = (InsertIntoTableCommand) parser.parseSingle(sql);
+ this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER
+ getTaskId()));
+ this.command.setJobId(getTaskId());
+ }
super.before();
@@ -128,14 +170,6 @@ public class InsertTask extends AbstractTask {
return new TUniqueId(taskId.getMostSignificantBits(),
taskId.getLeastSignificantBits());
}
- public InsertTask(String labelName, String currentDb, String sql,
UserIdentity userIdentity) {
- this.labelName = labelName;
- this.sql = sql;
- this.currentDb = currentDb;
- this.userIdentity = userIdentity;
-
- }
-
@Override
public void run() throws JobException {
try {
@@ -143,7 +177,7 @@ public class InsertTask extends AbstractTask {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
- command.run(ctx, stmtExecutor);
+ command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
} catch (Exception e) {
log.warn("execute insert task error, job id is {}, task id is
{},sql is {}", getJobId(),
getTaskId(), sql, e);
@@ -178,42 +212,28 @@ public class InsertTask extends AbstractTask {
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
- if (loadJob == null) {
+ if (jobInfo == null) {
// if task not start, load job is null,return pending task show
info
return getPendingTaskTVFInfo();
}
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(loadJob.getId())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getJobId())));
- trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel()));
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getState().name()));
- // etl info
- String etlInfo = FeConstants.null_string;
- if (!loadJob.getLoadingStatus().getCounters().isEmpty()) {
- etlInfo = Joiner.on(";
").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters());
- }
- trow.addToColumnValue(new TCell().setStringVal(etlInfo));
-
- // task info
- String taskInfo = "cluster:" + loadJob.getResourceName() + ";
timeout(s):" + loadJob.getTimeout()
- + "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + ";
priority:" + loadJob.getPriority();
- trow.addToColumnValue(new TCell().setStringVal(taskInfo));
-
+ trow.addToColumnValue(new TCell().setStringVal(labelName));
+ trow.addToColumnValue(new
TCell().setStringVal(jobInfo.getJobStatus().name()));
// err msg
String errMsg = FeConstants.null_string;
- if (loadJob.getFailMsg() != null) {
- errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:"
+ loadJob.getFailMsg().getMsg();
+ if (failMsg != null) {
+ errMsg = "type:" + failMsg.getCancelType() + "; msg:" +
failMsg.getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
-
// create time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp())));
-
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
// load end time
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp())));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
// tracking url
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
- trow.addToColumnValue(new
TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser()));
+ trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
+ trow.addToColumnValue(new
TCell().setStringVal(loadStatistic.toJson()));
+ trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
@@ -225,8 +245,6 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(getJobId() +
LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
- trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index c500e693295..669a2806a54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -125,7 +125,7 @@ public class MTMVJob extends AbstractJob<MTMVTask,
MTMVTaskContext> {
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
tasks.add(task);
- super.initTasks(tasks);
+ super.initTasks(tasks, taskType);
return tasks;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 776af152c69..814d6b773ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -17,7 +17,15 @@
package org.apache.doris.job.manager;
+import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@@ -26,45 +34,86 @@ import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.JobState;
+import com.google.common.collect.Lists;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Log4j2
public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
-
private final ConcurrentHashMap<Long, T> jobMap = new
ConcurrentHashMap<>(32);
- private JobScheduler jobScheduler;
+ private JobScheduler<T, C> jobScheduler;
+
+ // lock for job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
public void start() {
- jobScheduler = new JobScheduler(jobMap);
+ jobScheduler = new JobScheduler<T, C>(jobMap);
jobScheduler.start();
}
+
+ /**
+ * get running job
+ *
+ * @param jobId id
+ * @return running job
+ */
+ public T getJob(long jobId) {
+ return jobMap.get(jobId);
+ }
+
public void registerJob(T job) throws JobException {
- job.checkJobParams();
- checkJobNameExist(job.getJobName());
- if (jobMap.get(job.getJobId()) != null) {
- throw new JobException("job id exist, jobId:" + job.getJobId());
+ writeLock();
+ try {
+ job.onRegister();
+ job.checkJobParams();
+ checkJobNameExist(job.getJobName());
+ if (jobMap.get(job.getJobId()) != null) {
+ throw new JobException("job id exist, jobId:" +
job.getJobId());
+ }
+ jobMap.put(job.getJobId(), job);
+ //check its need to scheduler
+ jobScheduler.scheduleOneJob(job);
+ job.logCreateOperation();
+ } finally {
+ writeUnlock();
}
- Env.getCurrentEnv().getEditLog().logCreateJob(job);
- jobMap.put(job.getJobId(), job);
- //check its need to scheduler
- jobScheduler.scheduleOneJob(job);
}
-
private void checkJobNameExist(String jobName) throws JobException {
if (jobMap.values().stream().anyMatch(a ->
a.getJobName().equals(jobName))) {
throw new JobException("job name exist, jobName:" + jobName);
@@ -72,11 +121,17 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
public void unregisterJob(Long jobId) throws JobException {
- checkJobExist(jobId);
- jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
- jobMap.get(jobId).cancelAllTasks();
- Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId));
- jobMap.remove(jobId);
+ writeLock();
+ try {
+ checkJobExist(jobId);
+ jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
+ jobMap.get(jobId).cancelAllTasks();
+ jobMap.get(jobId).logFinalOperation();
+ jobMap.get(jobId).onUnRegister();
+ jobMap.remove(jobId);
+ } finally {
+ writeUnlock();
+ }
}
public void unregisterJob(String jobName) throws JobException {
@@ -95,7 +150,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
public void alterJobStatus(Long jobId, JobStatus status) throws
JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
- Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
+ jobMap.get(jobId).logUpdateOperation();
}
public void alterJobStatus(String jobName, JobStatus jobStatus) throws
JobException {
@@ -169,13 +224,12 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL,
context);
}
- public void replayCreateJob(T job) {
+ public void replayCreateJob(T job) throws JobException {
if (jobMap.containsKey(job.getJobId())) {
return;
}
jobMap.putIfAbsent(job.getJobId(), job);
- log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
- .add("msg", "replay create scheduler job").build());
+ job.onReplayCreate();
}
/**
@@ -187,13 +241,12 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
.add("msg", "replay update scheduler job").build());
}
- public void replayDeleteJob(T job) {
- if (null == jobMap.get(job.getJobId())) {
+ public void replayEndJob(T replayJob) throws JobException {
+ T job = jobMap.get(replayJob.getJobId());
+ if (null == job) {
return;
}
- jobMap.remove(job.getJobId());
- log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
- .add("msg", "replay delete scheduler job").build());
+ job.onReplayEnd(replayJob);
}
public void cancelTaskById(String jobName, Long taskId) throws
JobException {
@@ -236,4 +289,135 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
return jobMap.get(jobId);
}
+
+ /**
+ * get load info by db
+ *
+ * @param dbId db id
+ * @param dbName db name
+ * @param labelValue label name
+ * @param accurateMatch accurate match
+ * @param jobState state
+ * @return load infos
+ * @throws AnalysisException ex
+ */
+ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
+ String labelValue,
+ boolean accurateMatch,
+ JobState jobState)
throws AnalysisException {
+ LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
+ if (!Env.getCurrentEnv().getLabelProcessor().existJobs(dbId)) {
+ return loadJobInfos;
+ }
+ readLock();
+ try {
+ List<InsertJob> loadJobList =
Env.getCurrentEnv().getLabelProcessor()
+ .filterJobs(dbId, labelValue, accurateMatch);
+ // check state
+ for (InsertJob loadJob : loadJobList) {
+ try {
+ if (jobState != null && !validState(jobState, loadJob)) {
+ continue;
+ }
+ // add load job info, convert String list to Comparable
list
+ loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
+ } catch (RuntimeException e) {
+ // ignore this load job
+ log.warn("get load job info failed. job id: {}",
loadJob.getJobId(), e);
+ }
+ }
+ return loadJobInfos;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ private static boolean validState(JobState jobState, InsertJob loadJob) {
+ JobStatus status = loadJob.getJobStatus();
+ switch (status) {
+ case RUNNING:
+ return jobState == JobState.PENDING || jobState == JobState.ETL
+ || jobState == JobState.LOADING || jobState ==
JobState.COMMITTED;
+ case STOPPED:
+ return jobState == JobState.CANCELLED;
+ case FINISHED:
+ return jobState == JobState.FINISHED;
+ default:
+ return false;
+ }
+ }
+
+ public void cancelLoadJob(CancelLoadStmt cs)
+ throws JobException, AnalysisException, DdlException {
+ String dbName = cs.getDbName();
+ String label = cs.getLabel();
+ String state = cs.getState();
+ CompoundPredicate.Operator operator = cs.getOperator();
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ // List of load jobs waiting to be cancelled
+ List<InsertJob> unfinishedLoadJob;
+ readLock();
+ try {
+ List<InsertJob> loadJobs =
Env.getCurrentEnv().getLabelProcessor().getJobs(db);
+ List<InsertJob> matchLoadJobs = Lists.newArrayList();
+ addNeedCancelLoadJob(label, state, operator, loadJobs,
matchLoadJobs);
+ if (matchLoadJobs.isEmpty()) {
+ throw new JobException("Load job does not exist");
+ }
+ // check state here
+ unfinishedLoadJob =
+ matchLoadJobs.stream().filter(InsertJob::isRunning)
+ .collect(Collectors.toList());
+ if (unfinishedLoadJob.isEmpty()) {
+ throw new JobException("There is no uncompleted job");
+ }
+ } finally {
+ readUnlock();
+ }
+ for (InsertJob loadJob : unfinishedLoadJob) {
+ try {
+ unregisterJob(loadJob.getJobId());
+ } catch (JobException e) {
+ log.warn("Fail to cancel job, its label: {}",
loadJob.getLabelName());
+ }
+ }
+ }
+
+ private static void addNeedCancelLoadJob(String label, String state,
+ CompoundPredicate.Operator
operator, List<InsertJob> loadJobs,
+ List<InsertJob> matchLoadJobs)
+ throws AnalysisException {
+ PatternMatcher matcher =
PatternMatcherWrapper.createMysqlPattern(label,
+ CaseSensibility.LABEL.getCaseSensibility());
+ matchLoadJobs.addAll(
+ loadJobs.stream()
+ .filter(job -> !job.isCancelled())
+ .filter(job -> {
+ if (operator != null) {
+ // compound
+ boolean labelFilter =
+ label.contains("%") ?
matcher.match(job.getLabelName())
+ :
job.getLabelName().equalsIgnoreCase(label);
+ boolean stateFilter =
job.getJobStatus().name().equalsIgnoreCase(state);
+ return
CompoundPredicate.Operator.AND.equals(operator) ? labelFilter && stateFilter :
+ labelFilter || stateFilter;
+ }
+ if (StringUtils.isNotEmpty(label)) {
+ return label.contains("%") ?
matcher.match(job.getLabelName())
+ :
job.getLabelName().equalsIgnoreCase(label);
+ }
+ if (StringUtils.isNotEmpty(state)) {
+ return
job.getJobStatus().name().equalsIgnoreCase(state);
+ }
+ return false;
+ }).collect(Collectors.toList())
+ );
+ }
+ // public void updateJobProgress(Long jobId, Long beId, TUniqueId
loadId, TUniqueId fragmentId, long scannedRows,
+ // long scannedBytes, boolean isDone) {
+ // AbstractJob job = jobMap.get(jobId);
+ // if (job != null) {
+ // job.updateLoadingStatus(beId, loadId, fragmentId,
scannedRows, scannedBytes, isDone);
+ // }
+ // }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 7327183e95e..71f6ff1c4f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -27,6 +27,7 @@ import org.apache.doris.job.exception.JobException;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.RandomUtils;
@Data
@Log4j2
@@ -52,6 +53,15 @@ public abstract class AbstractTask implements Task {
@SerializedName(value = "emg")
private String errMsg;
+ public AbstractTask() {
+ taskId = getNextTaskId();
+ }
+
+ private static long getNextTaskId() {
+ // do not use Env.getNextId(), just generate id without logging
+ return System.nanoTime() + RandomUtils.nextInt();
+ }
+
@Override
public void onFail(String msg) throws JobException {
status = TaskStatus.FAILED;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 996c3ccbb32..fc2f6fca9c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -73,6 +73,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
@@ -198,7 +199,7 @@ public class ExportJob implements Writable {
private List<ExportTaskExecutor> jobExecutorList;
- private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new
ConcurrentHashMap<>();
+ private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor =
new ConcurrentHashMap<>();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
@@ -380,6 +381,10 @@ public class ExportJob implements Writable {
return statementBase;
}
+ public List<? extends TransientTaskExecutor> getTaskExecutors() {
+ return jobExecutorList;
+ }
+
private void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
for (List<StatementBase> selectStmts : selectStmtListPerParallel) {
@@ -607,7 +612,7 @@ public class ExportJob implements Writable {
// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
try {
- Env.getCurrentEnv().getExportTaskRegister().cancelTask(id);
+
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index ae7a175b896..90be7f9f10e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -59,23 +59,26 @@ import java.util.stream.Collectors;
public class ExportMgr {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
-
- // lock for export job
- // lock is private and must use after db lock
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); //
exportJobId to exportJob
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId =
Maps.newHashMap();
+ // lock for export job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
+
public ExportMgr() {
}
- public void readLock() {
+ public List<ExportJob> getJobs() {
+ return Lists.newArrayList(exportIdToJob.values());
+ }
+
+ private void readLock() {
lock.readLock().lock();
}
- public void readUnlock() {
+ private void readUnlock() {
lock.readLock().unlock();
}
@@ -87,10 +90,6 @@ public class ExportMgr {
lock.writeLock().unlock();
}
- public List<ExportJob> getJobs() {
- return Lists.newArrayList(exportIdToJob.values());
- }
-
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
long jobId = Env.getCurrentEnv().getNextId();
job.setId(jobId);
@@ -101,9 +100,8 @@ public class ExportMgr {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
- job.getJobExecutorList().forEach(executor -> {
- Long taskId =
Env.getCurrentEnv().getExportTaskRegister().registerTask(executor);
- executor.setTaskId(taskId);
+ job.getTaskExecutors().forEach(executor -> {
+ Long taskId =
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 6960efc7e67..26e873afd80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -66,12 +66,18 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
private AtomicBoolean isFinished;
ExportTaskExecutor(List<StatementBase> selectStmtLists, ExportJob
exportJob) {
+ this.taskId = UUID.randomUUID().getMostSignificantBits();
this.selectStmtLists = selectStmtLists;
this.exportJob = exportJob;
this.isCanceled = new AtomicBoolean(false);
this.isFinished = new AtomicBoolean(false);
}
+ @Override
+ public Long getId() {
+ return taskId;
+ }
+
@Override
public void execute() throws JobException {
if (isCanceled.get()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 6e5d56aa55e..073f47a66cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -37,7 +37,6 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
@@ -61,11 +60,9 @@ import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Joiner;
-import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.collect.Table;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
@@ -138,108 +135,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
protected String comment = "";
- public static class LoadStatistic {
- // number of rows processed on BE, this number will be updated
periodically by query report.
- // A load job may has several load tasks(queries), and each task has
several fragments.
- // each fragment will report independently.
- // load task id -> fragment id -> rows count
- private Table<TUniqueId, TUniqueId, Long> counterTbl =
HashBasedTable.create();
-
- // load task id -> fragment id -> load bytes
- private Table<TUniqueId, TUniqueId, Long> loadBytes =
HashBasedTable.create();
-
- // load task id -> unfinished backend id list
- private Map<TUniqueId, List<Long>> unfinishedBackendIds =
Maps.newHashMap();
- // load task id -> all backend id list
- private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
-
- // number of file to be loaded
- public int fileNum = 0;
- public long totalFileSizeB = 0;
-
- // init the statistic of specified load task
- public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId>
fragmentIds, List<Long> relatedBackendIds) {
- counterTbl.rowMap().remove(loadId);
- for (TUniqueId fragId : fragmentIds) {
- counterTbl.put(loadId, fragId, 0L);
- }
- loadBytes.rowMap().remove(loadId);
- for (TUniqueId fragId : fragmentIds) {
- loadBytes.put(loadId, fragId, 0L);
- }
- allBackendIds.put(loadId, relatedBackendIds);
- // need to get a copy of relatedBackendIds, so that when we modify
the "relatedBackendIds" in
- // allBackendIds, the list in unfinishedBackendIds will not be
changed.
- unfinishedBackendIds.put(loadId,
Lists.newArrayList(relatedBackendIds));
- }
-
- public synchronized void removeLoad(TUniqueId loadId) {
- counterTbl.rowMap().remove(loadId);
- loadBytes.rowMap().remove(loadId);
- unfinishedBackendIds.remove(loadId);
- allBackendIds.remove(loadId);
- }
-
- public synchronized void updateLoadProgress(long backendId, TUniqueId
loadId, TUniqueId fragmentId,
- long rows, long bytes,
boolean isDone) {
- if (counterTbl.contains(loadId, fragmentId)) {
- counterTbl.put(loadId, fragmentId, rows);
- }
-
- if (loadBytes.contains(loadId, fragmentId)) {
- loadBytes.put(loadId, fragmentId, bytes);
- }
- if (isDone && unfinishedBackendIds.containsKey(loadId)) {
- unfinishedBackendIds.get(loadId).remove(backendId);
- }
- }
-
- public synchronized long getScannedRows() {
- long total = 0;
- for (long rows : counterTbl.values()) {
- total += rows;
- }
- return total;
- }
-
- public synchronized long getLoadBytes() {
- long total = 0;
- for (long bytes : loadBytes.values()) {
- total += bytes;
- }
- return total;
- }
-
- public synchronized String toJson() {
- long total = 0;
- for (long rows : counterTbl.values()) {
- total += rows;
- }
- long totalBytes = 0;
- for (long bytes : loadBytes.values()) {
- totalBytes += bytes;
- }
-
- Map<String, Object> details = Maps.newHashMap();
- details.put("ScannedRows", total);
- details.put("LoadBytes", totalBytes);
- details.put("FileNumber", fileNum);
- details.put("FileSize", totalFileSizeB);
- details.put("TaskNumber", counterTbl.rowMap().size());
- details.put("Unfinished backends",
getPrintableMap(unfinishedBackendIds));
- details.put("All backends", getPrintableMap(allBackendIds));
- Gson gson = new Gson();
- return gson.toJson(details);
- }
-
- private Map<String, List<Long>> getPrintableMap(Map<TUniqueId,
List<Long>> map) {
- Map<String, List<Long>> newMap = Maps.newHashMap();
- for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
- newMap.put(DebugUtil.printId(entry.getKey()),
entry.getValue());
- }
- return newMap;
- }
- }
public LoadJob(EtlJobType jobType) {
this.jobType = jobType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 1fb83ae762d..72bc61239cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -278,7 +278,7 @@ public class LoadManager implements Writable {
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException,
AnalysisException {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
- List<LoadJob> uncompletedLoadJob = Lists.newArrayList();
+ List<LoadJob> unfinishedLoadJob;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(db.getId());
@@ -293,15 +293,15 @@ public class LoadManager implements Writable {
throw new DdlException("Load job does not exist");
}
// check state here
- uncompletedLoadJob =
+ unfinishedLoadJob =
matchLoadJobs.stream().filter(entity ->
!entity.isTxnDone()).collect(Collectors.toList());
- if (uncompletedLoadJob.isEmpty()) {
+ if (unfinishedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job");
}
} finally {
readUnlock();
}
- for (LoadJob loadJob : uncompletedLoadJob) {
+ for (LoadJob loadJob : unfinishedLoadJob) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL,
"user cancel"));
} catch (DdlException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
new file mode 100644
index 00000000000..0c65aa27851
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.google.gson.Gson;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class LoadStatistic {
+ // number of rows processed on BE, this number will be updated
periodically by query report.
+ // A load job may has several load tasks(queries), and each task has
several fragments.
+ // each fragment will report independently.
+ // load task id -> fragment id -> rows count
+ private Table<TUniqueId, TUniqueId, Long> counterTbl =
HashBasedTable.create();
+
+ // load task id -> fragment id -> load bytes
+ private Table<TUniqueId, TUniqueId, Long> loadBytes =
HashBasedTable.create();
+
+ // load task id -> unfinished backend id list
+ private Map<TUniqueId, List<Long>> unfinishedBackendIds =
Maps.newHashMap();
+ // load task id -> all backend id list
+ private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
+
+ private Map<String, String> counters = new HashMap<>();
+
+ // number of file to be loaded
+ public int fileNum = 0;
+ public long totalFileSizeB = 0;
+
+ // init the statistic of specified load task
+ public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId>
fragmentIds, List<Long> relatedBackendIds) {
+ counterTbl.rowMap().remove(loadId);
+ for (TUniqueId fragId : fragmentIds) {
+ counterTbl.put(loadId, fragId, 0L);
+ }
+ loadBytes.rowMap().remove(loadId);
+ for (TUniqueId fragId : fragmentIds) {
+ loadBytes.put(loadId, fragId, 0L);
+ }
+ allBackendIds.put(loadId, relatedBackendIds);
+ // need to get a copy of relatedBackendIds, so that when we modify the
"relatedBackendIds" in
+ // allBackendIds, the list in unfinishedBackendIds will not be changed.
+ unfinishedBackendIds.put(loadId,
Lists.newArrayList(relatedBackendIds));
+ }
+
+ public synchronized void removeLoad(TUniqueId loadId) {
+ counterTbl.rowMap().remove(loadId);
+ loadBytes.rowMap().remove(loadId);
+ unfinishedBackendIds.remove(loadId);
+ allBackendIds.remove(loadId);
+ }
+
+ public synchronized void updateLoadProgress(long backendId, TUniqueId
loadId, TUniqueId fragmentId,
+ long rows, long bytes, boolean
isDone) {
+ if (counterTbl.contains(loadId, fragmentId)) {
+ counterTbl.put(loadId, fragmentId, rows);
+ }
+
+ if (loadBytes.contains(loadId, fragmentId)) {
+ loadBytes.put(loadId, fragmentId, bytes);
+ }
+ if (isDone && unfinishedBackendIds.containsKey(loadId)) {
+ unfinishedBackendIds.get(loadId).remove(backendId);
+ }
+ }
+
+ public synchronized long getScannedRows() {
+ long total = 0;
+ for (long rows : counterTbl.values()) {
+ total += rows;
+ }
+ return total;
+ }
+
+ public synchronized long getLoadBytes() {
+ long total = 0;
+ for (long bytes : loadBytes.values()) {
+ total += bytes;
+ }
+ return total;
+ }
+
+ public Map<String, String> getCounters() {
+ // TODO: add extra statistics to counters
+ return counters;
+ }
+
+ public synchronized String toJson() {
+ long total = 0;
+ for (long rows : counterTbl.values()) {
+ total += rows;
+ }
+ long totalBytes = 0;
+ for (long bytes : loadBytes.values()) {
+ totalBytes += bytes;
+ }
+
+ Map<String, Object> details = Maps.newHashMap();
+ details.put("ScannedRows", total);
+ details.put("LoadBytes", totalBytes);
+ details.put("FileNumber", fileNum);
+ details.put("FileSize", totalFileSizeB);
+ details.put("TaskNumber", counterTbl.rowMap().size());
+ details.put("Unfinished backends",
getPrintableMap(unfinishedBackendIds));
+ details.put("All backends", getPrintableMap(allBackendIds));
+ Gson gson = new Gson();
+ return gson.toJson(details);
+ }
+
+ private Map<String, List<Long>> getPrintableMap(Map<TUniqueId, List<Long>>
map) {
+ Map<String, List<Long>> newMap = Maps.newHashMap();
+ for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
+ newMap.put(DebugUtil.printId(entry.getKey()), entry.getValue());
+ }
+ return newMap;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java
new file mode 100644
index 00000000000..88454eaecdc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>>
dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ /**
+ * get jobs with label
+ * @param db db
+ * @return jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getJobs(Database db) throws JobException {
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ return
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * add job with label
+ *
+ * @param job job with label
+ * @throws LabelAlreadyUsedException e
+ */
+ public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs;
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ labelToLoadJobs = new ConcurrentHashMap<>();
+ dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+ }
+ labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (labelToLoadJobs.containsKey(job.getLabelName())) {
+ throw new LabelAlreadyUsedException(job.getLabelName());
+ } else {
+ labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabelName()).add(job);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * support remove label job
+ * @param dbId db id
+ * @param labelName label name
+ */
+ public void removeJob(long dbId, String labelName) {
+ writeLock();
+ try {
+ if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
+ dbIdToLabelToLoadJobs.get(dbId).remove(labelName);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void cleanOldLabels() throws JobException {
+ // TODO: remain this method to implement label cleaner
+ }
+
+ /**
+ * filterJobs with label and support quick match label
+ * @param dbId dbId
+ * @param labelValue label
+ * @param accurateMatch direct find label from map
+ * @return jobs with label
+ */
+ public List<InsertJob> filterJobs(long dbId, String labelValue, boolean
accurateMatch)
+ throws AnalysisException {
+ List<InsertJob> loadJobList = new ArrayList<>();
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs =
this.dbIdToLabelToLoadJobs.get(dbId);
+ if (Strings.isNullOrEmpty(labelValue)) {
+ loadJobList.addAll(
+
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
+ } else {
+ // check label value
+ if (accurateMatch) {
+ if (!labelToLoadJobs.containsKey(labelValue)) {
+ return new ArrayList<>();
+ }
+ loadJobList.addAll(labelToLoadJobs.get(labelValue));
+ } else {
+ // non-accurate match
+ PatternMatcher matcher =
+
PatternMatcherWrapper.createMysqlPattern(labelValue,
+
CaseSensibility.LABEL.getCaseSensibility());
+ for (Map.Entry<String, List<InsertJob>> entry :
labelToLoadJobs.entrySet()) {
+ if (matcher.match(entry.getKey())) {
+ loadJobList.addAll(entry.getValue());
+ }
+ }
+ }
+ }
+ } finally {
+ readUnlock();
+ }
+ return loadJobList;
+ }
+
+ /**
+ * check jobs in database
+ * @param dbId dbId
+ * @return has jobs
+ */
+ public boolean existJobs(long dbId) {
+ readLock();
+ try {
+ return dbIdToLabelToLoadJobs.containsKey(dbId);
+ } finally {
+ readUnlock();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 0d24a8b9377..95ae97192c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -945,7 +945,8 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
LoadTask.MergeType mergeType = ddc.mergeType() == null ?
LoadTask.MergeType.APPEND
:
LoadTask.MergeType.valueOf(ddc.mergeType().getText());
- Optional<String> fileFormat = ddc.format == null ?
Optional.empty() : Optional.of(ddc.format.getText());
+ Optional<String> fileFormat = ddc.format == null ? Optional.empty()
+ : Optional.of(visitIdentifierOrStringLiteral(ddc.format));
Optional<String> separator = ddc.separator == null ?
Optional.empty() : Optional.of(ddc.separator.getText()
.substring(1, ddc.separator.getText().length() - 1));
Optional<String> comma = ddc.comma == null ? Optional.empty() :
Optional.of(ddc.comma.getText()
@@ -970,7 +971,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
}
String labelName = ctx.lableName.getText();
Map<String, String> properties = visitPropertyItemList(ctx.properties);
- String commentSpec = ctx.commentSpec() == null ? "" :
ctx.commentSpec().STRING_LITERAL().getText();
+ String commentSpec = ctx.commentSpec() == null ? "''" :
ctx.commentSpec().STRING_LITERAL().getText();
String comment =
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1,
commentSpec.length() - 1));
return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc,
properties, comment);
@@ -1044,6 +1045,15 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
}
}
+ @Override
+ public String
visitIdentifierOrStringLiteral(DorisParser.IdentifierOrStringLiteralContext
ctx) {
+ if (ctx.STRING_LITERAL() != null) {
+ return ctx.STRING_LITERAL().getText().substring(1,
ctx.STRING_LITERAL().getText().length() - 1);
+ } else {
+ return ctx.identifier().getText();
+ }
+ }
+
@Override
public UserIdentity visitUserIdentify(UserIdentifyContext ctx) {
String user = visitIdentifierOrText(ctx.user);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index 000fd477b31..90fb9532585 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -318,11 +318,14 @@ public class InsertExecutor {
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
- ctx.getEnv().getLoadManager()
- .recordFinishedLoadJob(labelName, txnId,
database.getFullName(),
- table.getId(),
- etlJobType, createAt, throwable == null ? "" :
throwable.getMessage(),
- coordinator.getTrackingUrl(), userIdentity, jobId);
+ if (!Config.enable_nereids_load) {
+ // just record for loadv2 here
+ ctx.getEnv().getLoadManager()
+ .recordFinishedLoadJob(labelName, txnId,
database.getFullName(),
+ table.getId(),
+ etlJobType, createAt, throwable == null ? "" :
throwable.getMessage(),
+ coordinator.getTrackingUrl(), userIdentity,
jobId);
+ }
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}",
e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 506637d189f..11a7926e5dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
+import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -95,6 +96,10 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
this.allowAutoPartition = true;
}
+ public Optional<String> getLabelName() {
+ return labelName;
+ }
+
public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}
@@ -109,6 +114,16 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ runInternal(ctx, executor);
+ }
+
+ public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
+ LoadStatistic loadStatistic) throws
Exception {
+ // TODO: add coordinator statistic
+ runInternal(ctx, executor);
+ }
+
+ private void runInternal(ConnectContext ctx, StmtExecutor executor) throws
Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index ed4133ffd08..ce61a3bbb3f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -18,16 +18,20 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NereidsException;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
@@ -53,7 +57,6 @@ import
org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryStateException;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.tablefunction.HdfsTableValuedFunction;
import org.apache.doris.tablefunction.S3TableValuedFunction;
@@ -83,10 +86,11 @@ public class LoadCommand extends Command implements
ForwardWithSync {
private final String labelName;
private final BulkStorageDesc bulkStorageDesc;
+ private final Set<String> sinkTableNames = new HashSet<>();
private final List<BulkLoadDataDesc> sourceInfos;
private final Map<String, String> properties;
private final String comment;
- private final List<LogicalPlan> plans = new ArrayList<>();
+ private List<InsertIntoTableCommand> plans = new ArrayList<>();
private Profile profile;
/**
@@ -119,15 +123,19 @@ public class LoadCommand extends Command implements
ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
- // TODO: begin txn form multi insert sql
- /* this.profile = new Profile("Query",
ctx.getSessionVariable().enableProfile);
- profile.getSummaryProfile().setQueryBeginTime();
- for (BulkLoadDataDesc dataDesc : sourceInfos) {
- plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx,
dataDesc), Optional.of(labelName), false));
- }
- profile.getSummaryProfile().setQueryPlanFinishTime();
- * executeInsertStmtPlan(ctx, executor, plans); */
- throw new AnalysisException("Fallback to legacy planner temporary.");
+ if (!Config.enable_nereids_load) {
+ throw new AnalysisException("Fallback to legacy planner
temporary.");
+ }
+ this.profile = new Profile("Query",
ctx.getSessionVariable().enableProfile);
+ profile.getSummaryProfile().setQueryBeginTime();
+ if (sourceInfos.size() == 1) {
+ plans = ImmutableList.of(new
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
+ Optional.of(labelName)));
+ } else {
+ throw new AnalysisException("Multi insert into statements are
unsupported.");
+ }
+ profile.getSummaryProfile().setQueryPlanFinishTime();
+ submitInsertStmtPlan(ctx, executor, plans);
}
private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc
dataDesc)
@@ -151,6 +159,7 @@ public class LoadCommand extends Command implements
ForwardWithSync {
boolean scanAllTvfCol = (tvfProjects.get(0) instanceof UnboundStar);
OlapTable olapTable = getOlapTable(ctx, dataDesc);
+ sinkTableNames.add(olapTable.getName());
List<Column> olapSchema = olapTable.getBaseSchema();
// map column index to mapping expr
Map<String, Expression> mappingExpressions =
dataDesc.getColumnMappings();
@@ -471,20 +480,14 @@ public class LoadCommand extends Command implements
ForwardWithSync {
return tvfProperties;
}
- private void executeInsertStmtPlan(ConnectContext ctx, StmtExecutor
executor, List<InsertIntoTableCommand> plans) {
+ private void submitInsertStmtPlan(ConnectContext ctx, StmtExecutor
executor, List<InsertIntoTableCommand> plans) {
try {
- for (LogicalPlan logicalPlan : plans) {
- ((Command) logicalPlan).run(ctx, executor);
- }
- } catch (QueryStateException e) {
- ctx.setState(e.getQueryState());
- throw new NereidsException("Command process failed", new
AnalysisException(e.getMessage(), e));
- } catch (UserException e) {
- // Return message to info client what happened.
- ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
- throw new NereidsException("Command process failed", new
AnalysisException(e.getMessage(), e));
+ JobExecutionConfiguration jobExecutionConfiguration = new
JobExecutionConfiguration();
+ jobExecutionConfiguration.setExecuteType(JobExecuteType.INSTANT);
+ InsertJob jobExecutor = new InsertJob(ctx, executor, labelName,
plans,
+ sinkTableNames, properties, comment,
jobExecutionConfiguration);
+ Env.getCurrentEnv().getJobManager().registerJob(jobExecutor);
} catch (Exception e) {
- // Maybe our bug
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage());
throw new NereidsException("Command process failed.", new
AnalysisException(e.getMessage(), e));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index ea8292712af..f78f417e66f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -65,7 +65,6 @@ import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.ExportMgr;
-import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
@@ -673,7 +672,7 @@ public class EditLog {
}
case OperationType.OP_DELETE_SCHEDULER_JOB: {
AbstractJob job = (AbstractJob) journal.getData();
- Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
+ Env.getCurrentEnv().getJobManager().replayEndJob(job);
break;
}
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
@@ -1347,30 +1346,6 @@ public class EditLog {
logEdit(OperationType.OP_RECOVER_TABLE, info);
}
- public void logLoadStart(LoadJob job) {
- logEdit(OperationType.OP_LOAD_START, job);
- }
-
- public void logLoadEtl(LoadJob job) {
- logEdit(OperationType.OP_LOAD_ETL, job);
- }
-
- public void logLoadLoading(LoadJob job) {
- logEdit(OperationType.OP_LOAD_LOADING, job);
- }
-
- public void logLoadQuorum(LoadJob job) {
- logEdit(OperationType.OP_LOAD_QUORUM, job);
- }
-
- public void logLoadCancel(LoadJob job) {
- logEdit(OperationType.OP_LOAD_CANCEL, job);
- }
-
- public void logLoadDone(LoadJob job) {
- logEdit(OperationType.OP_LOAD_DONE, job);
- }
-
public void logDropRollup(DropInfo info) {
logEdit(OperationType.OP_DROP_ROLLUP, info);
}
@@ -1644,7 +1619,7 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
- public void logDeleteJob(AbstractJob job) {
+ public void logEndJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}
@@ -1985,6 +1960,7 @@ public class EditLog {
public void logAlterMTMV(AlterMTMV log) {
logEdit(OperationType.OP_ALTER_MTMV, log);
+
}
public String getNotReadyReason() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 47682cd74f0..51dc1eb7319 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -174,7 +174,10 @@ public class DdlExecutor {
} else if (ddlStmt instanceof CancelExportStmt) {
env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
} else if (ddlStmt instanceof CancelLoadStmt) {
- env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
+ CancelLoadStmt cs = (CancelLoadStmt) ddlStmt;
+ // cancel all
+ env.getJobManager().cancelLoadJob(cs);
+ env.getLoadManager().cancelLoadJob(cs);
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt)
ddlStmt);
} else if (ddlStmt instanceof PauseRoutineLoadStmt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index b7ee282d248..0a895276909 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -183,6 +183,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
+import org.apache.doris.job.manager.JobManager;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
@@ -1169,32 +1170,36 @@ public class ShowExecutor {
Env env = ctx.getEnv();
DatabaseIf db =
ctx.getCurrentCatalog().getDbOrAnalysisException(showStmt.getDbName());
long dbId = db.getId();
-
+ List<List<Comparable>> loadInfos;
// combine the List<LoadInfo> of load(v1) and loadManager(v2)
Load load = env.getLoadInstance();
- List<List<Comparable>> loadInfos = load.getLoadJobInfosByDb(dbId,
db.getFullName(), showStmt.getLabelValue(),
+ loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(),
showStmt.getLabelValue(),
showStmt.isAccurateMatch(), showStmt.getStates());
Set<String> statesValue = showStmt.getStates() == null ? null :
showStmt.getStates().stream()
.map(entity -> entity.name())
.collect(Collectors.toSet());
loadInfos.addAll(env.getLoadManager()
.getLoadJobInfosByDb(dbId, showStmt.getLabelValue(),
showStmt.isAccurateMatch(), statesValue));
+ // add the nerieds load info
+ JobManager loadMgr = env.getJobManager();
+ loadInfos.addAll(loadMgr.getLoadJobInfosByDb(dbId, db.getFullName(),
showStmt.getLabelValue(),
+ showStmt.isAccurateMatch(), showStmt.getStateV2()));
// order the result of List<LoadInfo> by orderByPairs in show stmt
List<OrderByPair> orderByPairs = showStmt.getOrderByPairs();
- ListComparator<List<Comparable>> comparator = null;
+ ListComparator<List<Comparable>> comparator;
if (orderByPairs != null) {
OrderByPair[] orderByPairArr = new
OrderByPair[orderByPairs.size()];
- comparator = new
ListComparator<List<Comparable>>(orderByPairs.toArray(orderByPairArr));
+ comparator = new
ListComparator<>(orderByPairs.toArray(orderByPairArr));
} else {
// sort by id asc
- comparator = new ListComparator<List<Comparable>>(0);
+ comparator = new ListComparator<>(0);
}
Collections.sort(loadInfos, comparator);
List<List<String>> rows = Lists.newArrayList();
for (List<Comparable> loadInfo : loadInfos) {
- List<String> oneInfo = new ArrayList<String>(loadInfo.size());
+ List<String> oneInfo = new ArrayList<>(loadInfo.size());
// replace QUORUM_FINISHED -> FINISHED
if
(loadInfo.get(LoadProcDir.STATE_INDEX).equals(JobState.QUORUM_FINISHED.name()))
{
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
index f297753bbd8..f4205a6f4bb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/TransientTaskExecutor.java
@@ -34,5 +34,7 @@ public interface TransientTaskExecutor {
* Cancel the memory task.
*/
void cancel() throws JobException;
+
+ Long getId();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 9602b19ca2a..5f94fb5d998 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -23,7 +23,6 @@ import
org.apache.doris.scheduler.executor.TransientTaskExecutor;
import lombok.Setter;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class TransientTaskManager {
@@ -51,8 +50,8 @@ public class TransientTaskManager {
return taskExecutorMap.get(taskId);
}
- public Long registerMemoryTask(TransientTaskExecutor executor) {
- Long taskId = UUID.randomUUID().getMostSignificantBits();
+ public Long addMemoryTask(TransientTaskExecutor executor) {
+ Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
return taskId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
index 46d89c96ebc..0241f57fea0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/ExportTaskRegister.java
@@ -30,7 +30,7 @@ public class ExportTaskRegister implements
TransientTaskRegister {
@Override
public Long registerTask(TransientTaskExecutor executor) {
- return transientTaskManager.registerMemoryTask(executor);
+ return transientTaskManager.addMemoryTask(executor);
}
@Override
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index a6f151141b9..b41bf50738b 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -111,7 +111,7 @@ suite("test_base_insert_job") {
"""
Thread.sleep(25000)
- def onceJob = sql """select id,ExecuteSql from jobs("type"="insert") where
Name='${jobName}'"""
+ def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert")
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
assert onceJob.size() == 1
def onceJobId= onceJob.get(0).get(0);
def onceJobSql= onceJob.get(0).get(1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]