This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 676c2ae1995 [refactor](cloud) refactor copy into make logic clear
(#34181)
676c2ae1995 is described below
commit 676c2ae1995aa6e926b8258fa0e0239cd27fe854
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Apr 28 15:01:37 2024 +0800
[refactor](cloud) refactor copy into make logic clear (#34181)
---
.../java/org/apache/doris/catalog/EnvFactory.java | 6 -
.../org/apache/doris/cloud/catalog/CloudEnv.java | 5 +-
.../doris/cloud/catalog/CloudEnvFactory.java | 7 +-
.../load}/CleanCopyJobScheduler.java | 2 +-
.../loadv2 => cloud/load}/CleanCopyJobTask.java | 2 +-
.../doris/cloud/load/CloudBrokerLoadJob.java | 18 +-
.../org/apache/doris/cloud/load/CloudCopyJob.java | 106 -----------
.../apache/doris/cloud/load/CloudLoadManager.java | 212 ++++++++++++++++++++-
.../doris/{load/loadv2 => cloud/load}/CopyJob.java | 10 +-
.../loadv2 => cloud/load}/CopyLoadPendingTask.java | 6 +-
.../org/apache/doris/common/proc/LoadProcDir.java | 11 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 94 +--------
.../doris/load/loadv2/BrokerLoadPendingTask.java | 2 +-
.../org/apache/doris/load/loadv2/BulkLoadJob.java | 40 +---
.../java/org/apache/doris/load/loadv2/LoadJob.java | 5 +-
.../apache/doris/load/loadv2/LoadLoadingTask.java | 63 ------
.../org/apache/doris/load/loadv2/LoadManager.java | 131 +------------
.../main/java/org/apache/doris/qe/DdlExecutor.java | 5 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 23 ++-
.../{load/loadv2 => cloud/load}/CopyJobTest.java | 3 +-
.../load}/CopyLoadPendingTaskTest.java | 2 +-
21 files changed, 298 insertions(+), 455 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index 98b79d66a1c..a0be94b6834 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -29,7 +29,6 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.loadv2.BrokerLoadJob;
-import org.apache.doris.load.loadv2.CleanCopyJobScheduler;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
@@ -159,11 +158,6 @@ public class EnvFactory {
return new LoadManager(loadJobScheduler);
}
- public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler,
- CleanCopyJobScheduler
cleanCopyJobScheduler) {
- return new LoadManager(loadJobScheduler, cleanCopyJobScheduler);
- }
-
public MasterDaemon createTabletStatMgr() {
return new TabletStatMgr();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 8b14a733957..e52ea292932 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -27,6 +27,7 @@ import org.apache.doris.cloud.CacheHotspotManager;
import org.apache.doris.cloud.CloudWarmUpJob;
import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CleanCopyJobScheduler;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
@@ -41,7 +42,6 @@ import org.apache.doris.common.util.HttpURLUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.httpv2.meta.MetaBaseAction;
-import org.apache.doris.load.loadv2.CleanCopyJobScheduler;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.Storage;
import org.apache.doris.qe.ConnectContext;
@@ -80,7 +80,8 @@ public class CloudEnv extends Env {
public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
this.cleanCopyJobScheduler = new CleanCopyJobScheduler();
- this.loadManager =
EnvFactory.getInstance().createLoadManager(loadJobScheduler,
cleanCopyJobScheduler);
+ this.loadManager = ((CloudEnvFactory) EnvFactory.getInstance())
+ .createLoadManager(loadJobScheduler,
cleanCopyJobScheduler);
this.cloudClusterCheck = new
CloudClusterChecker((CloudSystemInfoService) systemInfo);
this.cloudInstanceStatusChecker = new
CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo);
this.cloudTabletRebalancer = new
CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 5d05928b540..32992307a8b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CleanCopyJobScheduler;
import org.apache.doris.cloud.load.CloudBrokerLoadJob;
import org.apache.doris.cloud.load.CloudLoadManager;
import org.apache.doris.cloud.load.CloudRoutineLoadManager;
@@ -178,9 +179,9 @@ public class CloudEnvFactory extends EnvFactory {
return new CloudRoutineLoadManager();
}
- @Override
- public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
- return new CloudLoadManager(loadJobScheduler);
+ public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler,
+ CleanCopyJobScheduler
cleanCopyJobScheduler) {
+ return new CloudLoadManager(loadJobScheduler, cleanCopyJobScheduler);
}
public MasterDaemon createTabletStatMgr() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
similarity index 98%
rename from
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
rename to
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
index 758c899e082..610d99716d4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
similarity index 98%
rename from
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java
rename to
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
index 245c4e19f26..3b6bd75d3b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 814e6020362..04560df0fd8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.loadv2.BrokerLoadJob;
@@ -62,14 +63,29 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
protected static final String CLOUD_CLUSTER_ID = "clusterId";
protected String cloudClusterId;
- private int retryTimes = 3;
+ protected int retryTimes = 3;
public CloudBrokerLoadJob() {
}
+ public CloudBrokerLoadJob(EtlJobType type) {
+ super(type);
+ }
+
+ public CloudBrokerLoadJob(EtlJobType type, long dbId, String label,
BrokerDesc brokerDesc,
+ OriginStatement originStmt, UserIdentity userInfo)
+ throws MetaNotFoundException {
+ super(type, dbId, label, brokerDesc, originStmt, userInfo);
+ setCloudClusterId();
+ }
+
public CloudBrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
OriginStatement originStmt,
UserIdentity userInfo) throws MetaNotFoundException {
super(dbId, label, brokerDesc, originStmt, userInfo);
+ setCloudClusterId();
+ }
+
+ private void setCloudClusterId() throws MetaNotFoundException {
ConnectContext context = ConnectContext.get();
if (context != null) {
String clusterName = context.getCloudCluster();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
deleted file mode 100644
index c3302901072..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cloud.load;
-
-import org.apache.doris.cloud.proto.Cloud.StagePB;
-import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.load.EtlJobType;
-import org.apache.doris.load.FailMsg;
-import org.apache.doris.load.loadv2.BrokerLoadJob;
-import org.apache.doris.load.loadv2.JobState;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import lombok.Getter;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CloudCopyJob extends BrokerLoadJob {
- private static final Logger LOG = LogManager.getLogger(CloudCopyJob.class);
- private static final String TABLE_NAME_KEY = "TableName";
- private static final String USER_NAME_KEY = "UserName";
-
- @Getter
- private String stageId;
- @Getter
- private StagePB.StageType stageType;
- @Getter
- private String stagePrefix;
- @Getter
- private long sizeLimit;
- @Getter
- private String pattern;
- @Getter
- private ObjectInfo objectInfo;
- @Getter
- private String copyId;
- @Getter
- private boolean forceCopy;
- private String loadFilePaths = "";
- private Map<String, String> properties = new HashMap<>();
- private volatile boolean abortedCopy = false;
- private boolean isReplay = false;
- private List<String> loadFiles = null;
-
- public CloudCopyJob() {
- super(EtlJobType.COPY);
- }
-
- @Override
- public void cancelJob(FailMsg failMsg) throws DdlException {
- super.cancelJob(failMsg);
- loadFiles = null;
- abortedCopy = true;
- }
-
- public void setAbortedCopy(boolean abortedCopy) {
- this.abortedCopy = abortedCopy;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Text.writeString(out, copyId);
- Text.writeString(out, loadFilePaths);
- Gson gson = new Gson();
- Text.writeString(out, properties == null ? "" :
gson.toJson(properties));
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- copyId = Text.readString(in);
- loadFilePaths = Text.readString(in);
- String property = Text.readString(in);
- properties = property.isEmpty() ? new HashMap<>()
- : (new Gson().fromJson(property, new TypeToken<Map<String,
String>>() {
- }.getType()));
-
- // FIXME: COPY JOB is not supported yet.
- state = JobState.CANCELLED;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index 5caa2108c59..f4543c6b066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -17,19 +17,52 @@
package org.apache.doris.cloud.load;
+import org.apache.doris.analysis.CopyStmt;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class CloudLoadManager extends LoadManager {
+ private static final Logger LOG =
LogManager.getLogger(CloudLoadManager.class);
+ private CleanCopyJobScheduler cleanCopyJobScheduler;
- public CloudLoadManager(LoadJobScheduler loadJobScheduler) {
+ public CloudLoadManager(LoadJobScheduler loadJobScheduler,
CleanCopyJobScheduler cleanCopyJobScheduler) {
super(loadJobScheduler);
+ this.cleanCopyJobScheduler = cleanCopyJobScheduler;
}
@Override
@@ -46,5 +79,182 @@ public class CloudLoadManager extends LoadManager {
return super.createLoadJobFromStmt(stmt);
}
+ public LoadJob createLoadJobFromStmt(CopyStmt stmt) throws DdlException {
+ Database database = super.checkDb(stmt.getDbName());
+ long dbId = database.getId();
+ BrokerLoadJob loadJob = null;
+ ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
+
+ writeLock();
+ try {
+ long unfinishedCopyJobNum = unprotectedGetUnfinishedCopyJobNum();
+ if (unfinishedCopyJobNum >= Config.cluster_max_waiting_copy_jobs) {
+ throw new DdlException(
+ "There are more than " + unfinishedCopyJobNum + "
unfinished copy jobs, please retry later.");
+ }
+ loadJob = new CopyJob(dbId, stmt.getLabel().getLabelName(),
ConnectContext.get().queryId(),
+ stmt.getBrokerDesc(), stmt.getOrigStmt(),
stmt.getUserInfo(), stmt.getStageId(),
+ stmt.getStageType(), stmt.getStagePrefix(),
stmt.getSizeLimit(), stmt.getPattern(),
+ stmt.getObjectInfo(), stmt.isForce(), stmt.getUserName());
+ loadJob.setJobProperties(stmt.getProperties());
+ loadJob.checkAndSetDataSourceInfo(database,
stmt.getDataDescriptions());
+ loadJob.setTimeout(ConnectContext.get().getExecTimeout());
+ createLoadJob(loadJob);
+ } catch (MetaNotFoundException e) {
+ throw new DdlException(e.getMessage());
+ } finally {
+ super.writeUnlock();
+ }
+ Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
+
+ // The job must be submitted after edit log.
+ // It guarantees that load job has not been changed before edit log.
+ loadJobScheduler.submitJob(loadJob);
+ return loadJob;
+ }
+
+ public void createCleanCopyJobTask(CleanCopyJobTask task) throws
DdlException {
+ cleanCopyJobScheduler.submitJob(task);
+ }
+
+ private long unprotectedGetUnfinishedCopyJobNum() {
+ return idToLoadJob.values().stream()
+ .filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED))
+ .filter(j -> j instanceof CopyJob).count();
+ }
+
+ /**
+ * This method will return the jobs info which can meet the condition of
input param.
+ *
+ * @param dbId used to filter jobs which belong to this db
+ * @param labelValue used to filter jobs which's label is or like
labelValue.
+ * @param accurateMatch true: filter jobs which's label is labelValue.
false: filter jobs which's label like itself.
+ * @param statesValue used to filter jobs which's state within the
statesValue set.
+ * @param jobTypes used to filter jobs which's type within the
jobTypes set.
+ * @param copyIdValue used to filter jobs which's copyId is or like
copyIdValue.
+ * @param copyIdAccurateMatch true: filter jobs which's copyId is
copyIdValue.
+ * false: filter jobs which's copyId like
itself.
+ * @return The result is the list of jobInfo.
+ * JobInfo is a list which includes the comparable object: jobId,
label, state etc.
+ * The result is unordered.
+ */
+ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String
labelValue, boolean accurateMatch,
+ Set<String> statesValue, Set<EtlJobType> jobTypes, String
copyIdValue, boolean copyIdAccurateMatch,
+ String tableNameValue, boolean tableNameAccurateMatch, String
fileValue, boolean fileAccurateMatch)
+ throws AnalysisException {
+ LinkedList<List<Comparable>> loadJobInfos = new
LinkedList<List<Comparable>>();
+ if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+ return loadJobInfos;
+ }
+
+ if (jobTypes == null || jobTypes.isEmpty()) {
+ jobTypes = new HashSet<>();
+ jobTypes.addAll(EnumSet.allOf(EtlJobType.class));
+ }
+
+ Set<JobState> states = Sets.newHashSet();
+ if (statesValue == null || statesValue.size() == 0) {
+ states.addAll(EnumSet.allOf(JobState.class));
+ } else {
+ for (String stateValue : statesValue) {
+ try {
+ states.add(JobState.valueOf(stateValue));
+ } catch (IllegalArgumentException e) {
+ // ignore this state
+ }
+ }
+ }
+
+ readLock();
+ try {
+ Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(dbId);
+ List<LoadJob> loadJobList = Lists.newArrayList();
+ if (Strings.isNullOrEmpty(labelValue)) {
+ loadJobList.addAll(
+
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
+ } else {
+ // check label value
+ if (accurateMatch) {
+ if (!labelToLoadJobs.containsKey(labelValue)) {
+ return loadJobInfos;
+ }
+ loadJobList.addAll(labelToLoadJobs.get(labelValue));
+ } else {
+ // non-accurate match
+ PatternMatcher matcher =
+
PatternMatcherWrapper.createMysqlPattern(labelValue,
+
CaseSensibility.LABEL.getCaseSensibility());
+ for (Map.Entry<String, List<LoadJob>> entry :
labelToLoadJobs.entrySet()) {
+ if (matcher.match(entry.getKey())) {
+ loadJobList.addAll(entry.getValue());
+ }
+ }
+ }
+ }
+
+ List<LoadJob> loadJobList2 = new ArrayList<>();
+ // check state
+ for (LoadJob loadJob : loadJobList) {
+ if (!states.contains(loadJob.getState())) {
+ continue;
+ }
+ if (!jobTypes.contains(loadJob.getJobType())) {
+ continue;
+ }
+ loadJobList2.add(loadJob);
+ }
+ loadJobList2 = filterCopyJob(loadJobList2, copyIdValue,
copyIdAccurateMatch, c -> c.getCopyId());
+ loadJobList2 = filterCopyJob(loadJobList2, tableNameValue,
tableNameAccurateMatch, c -> c.getTableName());
+ loadJobList2 = filterCopyJob(loadJobList2, fileValue,
fileAccurateMatch, c -> c.getFiles());
+ for (LoadJob loadJob : loadJobList2) {
+ try {
+ if (!states.contains(loadJob.getState())) {
+ continue;
+ }
+ // check auth
+ try {
+ checkJobAuth(loadJob.getDb().getCatalog().getName(),
loadJob.getDb().getName(),
+ loadJob.getTableNames());
+ } catch (AnalysisException e) {
+ continue;
+ }
+ // add load job info
+ loadJobInfos.add(loadJob.getShowInfo());
+ } catch (RuntimeException | DdlException |
MetaNotFoundException e) {
+ // ignore this load job
+ LOG.warn("get load job info failed. job id: {}",
loadJob.getId(), e);
+ }
+ }
+ return loadJobInfos;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ private List<LoadJob> filterCopyJob(List<LoadJob> loadJobList, String
value, boolean accurateMatch,
+ Function<CopyJob, String> func) throws AnalysisException {
+ if (Strings.isNullOrEmpty(value)) {
+ return loadJobList;
+ }
+ List<LoadJob> loadJobList2 = Lists.newArrayList();
+ for (LoadJob loadJob : loadJobList) {
+ if (loadJob.getJobType() != EtlJobType.COPY) {
+ continue;
+ }
+ CopyJob copyJob = (CopyJob) loadJob;
+ if (accurateMatch) {
+ if (func.apply(copyJob).equalsIgnoreCase(value)) {
+ loadJobList2.add(copyJob);
+ }
+ } else {
+ // non-accurate match
+ PatternMatcher matcher =
PatternMatcherWrapper.createMysqlPattern(value, false);
+ if (matcher.match(func.apply(copyJob))) {
+ loadJobList2.add(copyJob);
+ }
+ }
+ }
+ return loadJobList2;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
similarity index 96%
rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
rename to fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
index 637a182ec07..35574561a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CopyStmt;
@@ -40,6 +40,8 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJobFinalOperation;
+import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TUniqueId;
@@ -61,7 +63,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-public class CopyJob extends BrokerLoadJob {
+public class CopyJob extends CloudBrokerLoadJob {
private static final Logger LOG = LogManager.getLogger(CopyJob.class);
private static final String TABLE_NAME_KEY = "TableName";
private static final String USER_NAME_KEY = "UserName";
@@ -108,7 +110,7 @@ public class CopyJob extends BrokerLoadJob {
}
@Override
- protected void checkAndSetDataSourceInfo(Database db,
List<DataDescription> dataDescriptions) throws DdlException {
+ public void checkAndSetDataSourceInfo(Database db, List<DataDescription>
dataDescriptions) throws DdlException {
super.checkAndSetDataSourceInfo(db, dataDescriptions);
// now, copy into only support one table
for (DataDescription dataDescription : dataDescriptions) {
@@ -138,7 +140,7 @@ public class CopyJob extends BrokerLoadJob {
&& !isForceCopy()) {
CleanCopyJobTask copyJobCleanTask = new
CleanCopyJobTask(objectInfo, stageId, stageType, tableId,
copyId, loadFiles);
-
Env.getCurrentEnv().getLoadManager().createCleanCopyJobTask(copyJobCleanTask);
+ ((CloudLoadManager)
Env.getCurrentEnv().getLoadManager()).createCleanCopyJobTask(copyJobCleanTask);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
similarity index 98%
rename from
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
rename to
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
index bd27c67e6e5..0ab6a9ce907 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
@@ -38,6 +38,8 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.loadv2.BrokerLoadPendingTask;
+import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.collect.Lists;
@@ -72,7 +74,7 @@ public class CopyLoadPendingTask extends
BrokerLoadPendingTask {
}
@Override
- void executeTask() throws UserException {
+ public void executeTask() throws UserException {
super.executeTask(); // get all files and begin txn
if (!isBeginCopyDone) {
beginCopy((BrokerPendingTaskAttachment) attachment);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
index e6cd4108ee8..8201c0176e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
@@ -18,7 +18,9 @@
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Database;
+import org.apache.doris.cloud.load.CloudLoadManager;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.load.loadv2.LoadManager;
import com.google.common.collect.ImmutableList;
@@ -66,8 +68,13 @@ public class LoadProcDir implements ProcDirInterface {
if (db == null) {
loadJobInfos = loadManager.getAllLoadJobInfos();
} else {
- loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null,
false,
- null, null, null, false, null, false, null, false);
+ if (!Config.isCloudMode()) {
+ loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(),
null, false, null);
+ } else {
+ loadJobInfos = ((CloudLoadManager) loadManager)
+ .getLoadJobInfosByDb(db.getId(), null, false,
+ null, null, null, false, null, false, null, false);
+ }
}
int counter = 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 60dfce0edb6..fd70e03caa7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -58,7 +58,6 @@ import
org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -224,16 +223,8 @@ public class BrokerLoadJob extends BulkLoadJob {
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- if (Config.isNotCloudMode()) {
- task.init(loadId, attachment.getFileStatusByTable(aggKey),
- attachment.getFileNumByTable(aggKey), getUserInfo());
- } else {
- if (Strings.isNullOrEmpty(clusterId)) {
- throw new UserException("can not get a valid cluster");
- }
- task.init(loadId, attachment.getFileStatusByTable(aggKey),
- attachment.getFileNumByTable(aggKey), getUserInfo(),
clusterId);
- }
+ task.init(loadId, attachment.getFileStatusByTable(aggKey),
+ attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
return task;
}
@@ -464,87 +455,6 @@ public class BrokerLoadJob extends BulkLoadJob {
jobProfile = null;
}
- @Override
- public void onTaskFailed(long taskId, FailMsg failMsg) {
- if (!Config.isCloudMode() || Strings.isNullOrEmpty(this.clusterId)) {
- super.onTaskFailed(taskId, failMsg);
- return;
- }
- try {
- writeLock();
- if (isTxnDone()) {
- LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("label", label)
- .add("transactionId", transactionId)
- .add("state", state)
- .add("error_msg", "this task will be ignored when job
is: " + state)
- .build());
- return;
- }
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("label", label)
- .add("transactionId", transactionId)
- .add("state", state)
- .add("retryTimes", retryTimes)
- .add("failMsg", failMsg.getMsg())
- .build());
-
- this.retryTimes--;
- if (this.retryTimes <= 0) {
- boolean abortTxn = this.transactionId > 0 ? true : false;
- unprotectedExecuteCancel(failMsg, abortTxn);
- logFinalOperation();
- return;
- } else {
- unprotectedExecuteRetry(failMsg);
- }
- } finally {
- writeUnlock();
- }
-
- boolean allTaskDone = false;
- while (!allTaskDone) {
- try {
- writeLock();
- // check if all task has been done
- // unprotectedExecuteRetry() will cancel all running task
- allTaskDone = true;
- for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) {
- if (entry.getKey() != taskId &&
!entry.getValue().isDone()) {
- LOG.info("LoadTask({}) has not been done",
entry.getKey());
- allTaskDone = false;
- }
- }
- } finally {
- writeUnlock();
- }
- if (!allTaskDone) {
- try {
- Thread.sleep(1000);
- continue;
- } catch (InterruptedException e) {
- LOG.warn("", e);
- }
- }
- }
-
- try {
- writeLock();
- this.state = JobState.PENDING;
- this.idToTasks.clear();
- this.failMsg = null;
- this.finishedTaskIds.clear();
-
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
- LoadTask task = createPendingTask();
- // retry default backoff 60 seconds, because `be restart` is slow
- task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000);
- idToTasks.put(task.getSignature(), task);
- Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
- } finally {
- writeUnlock();
- }
- }
-
@Override
public String getResourceName() {
StorageBackend.StorageType storageType = brokerDesc.getStorageType();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
index e1303f3c7a4..e41cd4aee2a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
@@ -55,7 +55,7 @@ public class BrokerLoadPendingTask extends LoadTask {
}
@Override
- void executeTask() throws UserException {
+ public void executeTask() throws UserException {
LOG.info("begin to execute broker pending task. job: {}",
callback.getCallbackId());
getAllFileStatus();
((BrokerLoadJob) callback).beginTxn();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index d9b1b1fbcd8..72b3a96c4eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -32,8 +32,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
@@ -97,12 +95,6 @@ public abstract class BulkLoadJob extends LoadJob {
// we persist these sessionVariables due to the session is not available
when replaying the job.
protected Map<String, String> sessionVariables = Maps.newHashMap();
- private static final String CLUSTER_ID = "clusterId";
- protected String clusterId;
-
- // retry 3 times is enough
- protected int retryTimes = 3;
-
public BulkLoadJob(EtlJobType jobType) {
super(jobType);
}
@@ -114,36 +106,9 @@ public abstract class BulkLoadJob extends LoadJob {
this.authorizationInfo = gatherAuthInfo();
this.userInfo = userInfo;
- if (Config.isCloudMode()) {
- ConnectContext context = ConnectContext.get();
- if (context != null) {
- String clusterName = context.getCloudCluster();
- if (Strings.isNullOrEmpty(clusterName)) {
- LOG.warn("cluster name is empty");
- throw new MetaNotFoundException("cluster name is empty");
- }
-
- this.clusterId = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudClusterIdByName(clusterName);
- if
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
- clusterName =
context.getSessionVariable().getCloudCluster();
- this.clusterId =
- ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getCloudClusterIdByName(clusterName);
- }
- if (Strings.isNullOrEmpty(this.clusterId)) {
- LOG.warn("cluster id is empty, cluster name {}",
clusterName);
- throw new MetaNotFoundException("cluster id is empty,
cluster name: " + clusterName);
- }
- }
- }
-
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE,
Long.toString(var.getSqlMode()));
- if (Config.isCloudMode()) {
- sessionVariables.put(CLUSTER_ID, clusterId);
- }
} else {
sessionVariables.put(SessionVariable.SQL_MODE,
String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
@@ -184,7 +149,7 @@ public abstract class BulkLoadJob extends LoadJob {
}
}
- protected void checkAndSetDataSourceInfo(Database db,
List<DataDescription> dataDescriptions) throws DdlException {
+ public void checkAndSetDataSourceInfo(Database db, List<DataDescription>
dataDescriptions) throws DdlException {
// check data source info
db.readLock();
try {
@@ -305,9 +270,6 @@ public abstract class BulkLoadJob extends LoadJob {
fileGroupAggInfo = new BrokerFileGroupAggInfo();
SqlParser parser = new SqlParser(new SqlScanner(new
StringReader(originStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
- if (Config.isCloudMode()) {
- clusterId = sessionVariables.get("clusterId");
- }
try {
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
analyzeStmt(SqlParserUtils.getStmt(parser, originStmt.idx), db);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 16363d72c03..65befadb13a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.cloud.load.CopyJob;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -248,7 +249,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
*
* @return
*/
- abstract Set<String> getTableNames() throws MetaNotFoundException;
+ public abstract Set<String> getTableNames() throws MetaNotFoundException;
// return true if the corresponding transaction is done(COMMITTED,
FINISHED, CANCELLED, RETRY)
public boolean isTxnDone() {
@@ -1156,7 +1157,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
return (long) jobProperties.get(LoadStmt.TIMEOUT_PROPERTY);
}
- protected void setTimeout(long timeout) {
+ public void setTimeout(long timeout) {
jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 543c3ac7805..af83fead692 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -20,10 +20,8 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Status;
@@ -34,7 +32,6 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
@@ -44,7 +41,6 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
import org.apache.doris.transaction.TabletCommitInfo;
-import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -84,7 +80,6 @@ public class LoadLoadingTask extends LoadTask {
private Profile jobProfile;
private long beginTime;
- private String clusterId;
private List<TPipelineWorkloadGroup> tWorkloadGroups = null;
@@ -127,43 +122,6 @@ public class LoadLoadingTask extends LoadTask {
planner.plan(loadId, fileStatusList, fileNum);
}
- public void init(TUniqueId loadId, List<List<TBrokerFileStatus>>
fileStatusList,
- int fileNum, UserIdentity userInfo, String clusterId) throws
UserException {
- this.loadId = loadId;
- planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId,
db.getId(), table, brokerDesc, fileGroups,
- strictMode, isPartialUpdate, timezone, this.timeoutS,
this.loadParallelism, this.sendBatchParallelism,
- this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink,
enableMemTableOnSinkNode);
- boolean needCleanCtx = false;
- try {
- if (Config.isCloudMode()) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(clusterId);
- if (Strings.isNullOrEmpty(clusterName)) {
- String errMsg = "cluster name is empty, cluster id is " +
clusterId;
- LOG.warn(errMsg);
- throw new UserException(errMsg);
- }
-
- if (ConnectContext.get() == null) {
- ConnectContext ctx = new ConnectContext();
- ctx.setThreadLocalInfo();
- ctx.setCloudCluster(clusterName);
- needCleanCtx = true;
- } else {
- ConnectContext.get().setCloudCluster(clusterName);
- }
- }
- planner.plan(loadId, fileStatusList, fileNum);
- this.clusterId = clusterId;
- } catch (Exception e) {
- throw e;
- } finally {
- if (Config.isCloudMode() && needCleanCtx) {
- ConnectContext.remove();
- }
- }
- }
-
public TUniqueId getLoadId() {
return loadId;
}
@@ -172,23 +130,6 @@ public class LoadLoadingTask extends LoadTask {
protected void executeTask() throws Exception {
LOG.info("begin to execute loading task. load id: {} job id: {}. db:
{}, tbl: {}. left retry: {}",
DebugUtil.printId(loadId), callback.getCallbackId(),
db.getFullName(), table.getName(), retryTime);
- boolean needCleanCtx = false;
- if (Config.isCloudMode()) {
- String clusterName = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
- .getClusterNameByClusterId(this.clusterId);
- if (Strings.isNullOrEmpty(clusterName)) {
- throw new Exception("cluster is empty");
- }
-
- if (ConnectContext.get() == null) {
- ConnectContext ctx = new ConnectContext();
- ctx.setThreadLocalInfo();
- ctx.setCloudCluster(clusterName);
- needCleanCtx = true;
- } else {
- ConnectContext.get().setCloudCluster(clusterName);
- }
- }
retryTime--;
beginTime = System.currentTimeMillis();
@@ -197,10 +138,6 @@ public class LoadLoadingTask extends LoadTask {
return;
}
executeOnce();
-
- if (Config.isCloudMode() && needCleanCtx) {
- ConnectContext.remove();
- }
}
protected void executeOnce() throws Exception {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index d76097fb7ca..ecde827b9ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -21,13 +21,11 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
-import org.apache.doris.analysis.CopyStmt;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
@@ -78,7 +76,6 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -87,7 +84,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -101,10 +97,9 @@ import java.util.stream.Collectors;
public class LoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadManager.class);
- private Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
- private Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs =
Maps.newConcurrentMap();
- private LoadJobScheduler loadJobScheduler;
- private CleanCopyJobScheduler cleanCopyJobScheduler;
+ protected Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
+ protected Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs =
Maps.newConcurrentMap();
+ protected LoadJobScheduler loadJobScheduler;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private MysqlLoadManager mysqlLoadManager;
@@ -116,11 +111,6 @@ public class LoadManager implements Writable {
this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
}
- public LoadManager(LoadJobScheduler loadJobScheduler,
CleanCopyJobScheduler cleanCopyJobScheduler) {
- this(loadJobScheduler);
- this.cleanCopyJobScheduler = cleanCopyJobScheduler;
- }
-
public void start() {
tokenManager.start();
mysqlLoadManager.start();
@@ -172,62 +162,11 @@ public class LoadManager implements Writable {
return loadJob.getId();
}
- public LoadJob createLoadJobFromStmt(CopyStmt stmt) throws DdlException {
- Database database = checkDb(stmt.getDbName());
- long dbId = database.getId();
- BrokerLoadJob loadJob = null;
- if (Config.isCloudMode()) {
- ConnectContext context = ConnectContext.get();
- if (context != null) {
- String cloudCluster = context.getCloudCluster();
- if (!Strings.isNullOrEmpty(cloudCluster)) {
- ((CloudSystemInfoService)
Env.getCurrentSystemInfo()).waitForAutoStart(cloudCluster);
- }
- }
- }
- writeLock();
- try {
- long unfinishedCopyJobNum = unprotectedGetUnfinishedCopyJobNum();
- if (unfinishedCopyJobNum >= Config.cluster_max_waiting_copy_jobs) {
- throw new DdlException(
- "There are more than " + unfinishedCopyJobNum + "
unfinished copy jobs, please retry later.");
- }
- loadJob = new CopyJob(dbId, stmt.getLabel().getLabelName(),
ConnectContext.get().queryId(),
- stmt.getBrokerDesc(), stmt.getOrigStmt(),
stmt.getUserInfo(), stmt.getStageId(),
- stmt.getStageType(), stmt.getStagePrefix(),
stmt.getSizeLimit(), stmt.getPattern(),
- stmt.getObjectInfo(), stmt.isForce(), stmt.getUserName());
- loadJob.setJobProperties(stmt.getProperties());
- loadJob.checkAndSetDataSourceInfo(database,
stmt.getDataDescriptions());
- loadJob.setTimeout(ConnectContext.get().getExecTimeout());
- createLoadJob(loadJob);
- } catch (MetaNotFoundException e) {
- throw new DdlException(e.getMessage());
- } finally {
- writeUnlock();
- }
- Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
-
- // The job must be submitted after edit log.
- // It guarantees that load job has not been changed before edit log.
- loadJobScheduler.submitJob(loadJob);
- return loadJob;
- }
-
- public void createCleanCopyJobTask(CleanCopyJobTask task) throws
DdlException {
- cleanCopyJobScheduler.submitJob(task);
- }
-
private long unprotectedGetUnfinishedJobNum() {
return idToLoadJob.values().stream()
.filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED)).count();
}
- private long unprotectedGetUnfinishedCopyJobNum() {
- return idToLoadJob.values().stream()
- .filter(j -> (j.getState() != JobState.FINISHED &&
j.getState() != JobState.CANCELLED))
- .filter(j -> j instanceof CopyJob).count();
- }
-
/**
* MultiLoadMgr use.
**/
@@ -257,7 +196,7 @@ public class LoadManager implements Writable {
}
// add load job and also add to callback factory
- private void createLoadJob(LoadJob loadJob) {
+ protected void createLoadJob(LoadJob loadJob) {
if (loadJob.isExpired(System.currentTimeMillis())) {
// This can happen in replay logic.
return;
@@ -631,28 +570,17 @@ public class LoadManager implements Writable {
* @param labelValue used to filter jobs which's label is or like
labelValue.
* @param accurateMatch true: filter jobs which's label is labelValue.
false: filter jobs which's label like itself.
* @param statesValue used to filter jobs which's state within the
statesValue set.
- * @param jobTypes used to filter jobs which's type within the
jobTypes set.
- * @param copyIdValue used to filter jobs which's copyId is or like
copyIdValue.
- * @param copyIdAccurateMatch true: filter jobs which's copyId is
copyIdValue.
- * false: filter jobs which's copyId like
itself.
* @return The result is the list of jobInfo.
* JobInfo is a list which includes the comparable object: jobId,
label, state etc.
* The result is unordered.
*/
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String
labelValue, boolean accurateMatch,
- Set<String> statesValue, Set<EtlJobType> jobTypes, String
copyIdValue, boolean copyIdAccurateMatch,
- String tableNameValue, boolean tableNameAccurateMatch, String
fileValue, boolean fileAccurateMatch)
- throws AnalysisException {
+ Set<String> statesValue)
throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new
LinkedList<List<Comparable>>();
if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
return loadJobInfos;
}
- if (jobTypes == null || jobTypes.isEmpty()) {
- jobTypes = new HashSet<>();
- jobTypes.addAll(EnumSet.allOf(EtlJobType.class));
- }
-
Set<JobState> states = Sets.newHashSet();
if (statesValue == null || statesValue.size() == 0) {
states.addAll(EnumSet.allOf(JobState.class));
@@ -693,21 +621,8 @@ public class LoadManager implements Writable {
}
}
- List<LoadJob> loadJobList2 = new ArrayList<>();
// check state
for (LoadJob loadJob : loadJobList) {
- if (!states.contains(loadJob.getState())) {
- continue;
- }
- if (!jobTypes.contains(loadJob.jobType)) {
- continue;
- }
- loadJobList2.add(loadJob);
- }
- loadJobList2 = filterCopyJob(loadJobList2, copyIdValue,
copyIdAccurateMatch, c -> c.getCopyId());
- loadJobList2 = filterCopyJob(loadJobList2, tableNameValue,
tableNameAccurateMatch, c -> c.getTableName());
- loadJobList2 = filterCopyJob(loadJobList2, fileValue,
fileAccurateMatch, c -> c.getFiles());
- for (LoadJob loadJob : loadJobList2) {
try {
if (!states.contains(loadJob.getState())) {
continue;
@@ -732,32 +647,6 @@ public class LoadManager implements Writable {
}
}
- private List<LoadJob> filterCopyJob(List<LoadJob> loadJobList, String
value, boolean accurateMatch,
- Function<CopyJob, String> func) throws AnalysisException {
- if (Strings.isNullOrEmpty(value)) {
- return loadJobList;
- }
- List<LoadJob> loadJobList2 = Lists.newArrayList();
- for (LoadJob loadJob : loadJobList) {
- if (loadJob.getJobType() != EtlJobType.COPY) {
- continue;
- }
- CopyJob copyJob = (CopyJob) loadJob;
- if (accurateMatch) {
- if (func.apply(copyJob).equalsIgnoreCase(value)) {
- loadJobList2.add(copyJob);
- }
- } else {
- // non-accurate match
- PatternMatcher matcher =
PatternMatcherWrapper.createMysqlPattern(value, false);
- if (matcher.match(func.apply(copyJob))) {
- loadJobList2.add(copyJob);
- }
- }
- }
- return loadJobList2;
- }
-
public void checkJobAuth(String ctlName, String dbName, Set<String>
tableNames) throws AnalysisException {
if (tableNames.isEmpty()) {
if (!Env.getCurrentEnv().getAccessManager()
@@ -875,7 +764,7 @@ public class LoadManager implements Writable {
}
}
- private Database checkDb(String dbName) throws DdlException {
+ protected Database checkDb(String dbName) throws DdlException {
return Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
}
@@ -993,19 +882,19 @@ public class LoadManager implements Writable {
counter, dbId, label, isReplay);
}
- private void readLock() {
+ protected void readLock() {
lock.readLock().lock();
}
- private void readUnlock() {
+ protected void readUnlock() {
lock.readLock().unlock();
}
- private void writeLock() {
+ protected void writeLock() {
lock.writeLock().lock();
}
- private void writeUnlock() {
+ protected void writeUnlock() {
lock.writeLock().unlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 49735521044..bdc662dccec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -131,12 +131,13 @@ import
org.apache.doris.analysis.UnsetDefaultStorageVaultStmt;
import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.load.CloudLoadManager;
+import org.apache.doris.cloud.load.CopyJob;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
-import org.apache.doris.load.loadv2.CopyJob;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.sync.SyncJobManager;
@@ -445,7 +446,7 @@ public class DdlExecutor {
}
private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws
Exception {
- CopyJob job = (CopyJob)
env.getLoadManager().createLoadJobFromStmt(copyStmt);
+ CopyJob job = (CopyJob) (((CloudLoadManager)
env.getLoadManager()).createLoadJobFromStmt(copyStmt));
if (!copyStmt.isAsync()) {
// wait for execute finished
waitJobCompleted(job);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index e6fbef29889..8bcd7ab15f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -152,6 +152,7 @@ import org.apache.doris.catalog.View;
import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CloudLoadManager;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -1331,11 +1332,17 @@ public class ShowExecutor {
Set<String> statesValue = showStmt.getStates() == null ? null :
showStmt.getStates().stream()
.map(entity -> entity.name())
.collect(Collectors.toSet());
- loadInfos.addAll(env.getLoadManager().getLoadJobInfosByDb(dbId,
showStmt.getLabelValue(),
+ if (!Config.isCloudMode()) {
+ loadInfos.addAll(env.getLoadManager()
+ .getLoadJobInfosByDb(dbId, showStmt.getLabelValue(),
showStmt.isAccurateMatch(), statesValue));
+ } else {
+ loadInfos.addAll(((CloudLoadManager) env.getLoadManager())
+ .getLoadJobInfosByDb(dbId, showStmt.getLabelValue(),
showStmt.isAccurateMatch(), statesValue, jobTypes,
showStmt.getCopyIdValue(),
showStmt.isCopyIdAccurateMatch(),
showStmt.getTableNameValue(),
showStmt.isTableNameAccurateMatch(),
showStmt.getFileValue(),
showStmt.isFileAccurateMatch()));
+ }
// add the nerieds load info
JobManager loadMgr = env.getJobManager();
loadInfos.addAll(loadMgr.getLoadJobInfosByDb(dbId, db.getFullName(),
showStmt.getLabelValue(),
@@ -1526,9 +1533,17 @@ public class ShowExecutor {
throws AnalysisException {
LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
if (showWarningsStmt.isFindByLabel()) {
- List<List<Comparable>> loadJobInfosByDb =
loadManager.getLoadJobInfosByDb(db.getId(),
- showWarningsStmt.getLabel(),
- true, null, null, null, false, null, false, null, false);
+ List<List<Comparable>> loadJobInfosByDb;
+ if (!Config.isCloudMode()) {
+ loadJobInfosByDb = loadManager.getLoadJobInfosByDb(db.getId(),
+ showWarningsStmt.getLabel(),
+ true, null);
+ } else {
+ loadJobInfosByDb = ((CloudLoadManager) loadManager)
+ .getLoadJobInfosByDb(db.getId(),
+ showWarningsStmt.getLabel(),
+ true, null, null, null, false, null, false, null,
false);
+ }
if (CollectionUtils.isEmpty(loadJobInfosByDb)) {
return null;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
similarity index 98%
rename from
fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java
rename to fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
index a2a880ba847..ccaeecd4a8f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
@@ -31,6 +31,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
similarity index 99%
rename from
fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
index 1beab75f8e8..577d485f4e7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]