This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 676c2ae1995 [refactor](cloud) refactor copy into make logic clear 
(#34181)
676c2ae1995 is described below

commit 676c2ae1995aa6e926b8258fa0e0239cd27fe854
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sun Apr 28 15:01:37 2024 +0800

    [refactor](cloud) refactor copy into make logic clear (#34181)
---
 .../java/org/apache/doris/catalog/EnvFactory.java  |   6 -
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   5 +-
 .../doris/cloud/catalog/CloudEnvFactory.java       |   7 +-
 .../load}/CleanCopyJobScheduler.java               |   2 +-
 .../loadv2 => cloud/load}/CleanCopyJobTask.java    |   2 +-
 .../doris/cloud/load/CloudBrokerLoadJob.java       |  18 +-
 .../org/apache/doris/cloud/load/CloudCopyJob.java  | 106 -----------
 .../apache/doris/cloud/load/CloudLoadManager.java  | 212 ++++++++++++++++++++-
 .../doris/{load/loadv2 => cloud/load}/CopyJob.java |  10 +-
 .../loadv2 => cloud/load}/CopyLoadPendingTask.java |   6 +-
 .../org/apache/doris/common/proc/LoadProcDir.java  |  11 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  94 +--------
 .../doris/load/loadv2/BrokerLoadPendingTask.java   |   2 +-
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  |  40 +---
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   5 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  63 ------
 .../org/apache/doris/load/loadv2/LoadManager.java  | 131 +------------
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   5 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  23 ++-
 .../{load/loadv2 => cloud/load}/CopyJobTest.java   |   3 +-
 .../load}/CopyLoadPendingTaskTest.java             |   2 +-
 21 files changed, 298 insertions(+), 455 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index 98b79d66a1c..a0be94b6834 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -29,7 +29,6 @@ import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
-import org.apache.doris.load.loadv2.CleanCopyJobScheduler;
 import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.RoutineLoadManager;
@@ -159,11 +158,6 @@ public class EnvFactory {
         return new LoadManager(loadJobScheduler);
     }
 
-    public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler,
-                                        CleanCopyJobScheduler 
cleanCopyJobScheduler) {
-        return new LoadManager(loadJobScheduler, cleanCopyJobScheduler);
-    }
-
     public MasterDaemon createTabletStatMgr() {
         return new TabletStatMgr();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 8b14a733957..e52ea292932 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -27,6 +27,7 @@ import org.apache.doris.cloud.CacheHotspotManager;
 import org.apache.doris.cloud.CloudWarmUpJob;
 import org.apache.doris.cloud.CloudWarmUpJob.JobState;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CleanCopyJobScheduler;
 import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.proto.Cloud.NodeInfoPB;
@@ -41,7 +42,6 @@ import org.apache.doris.common.util.HttpURLUtil;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.httpv2.meta.MetaBaseAction;
-import org.apache.doris.load.loadv2.CleanCopyJobScheduler;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.Storage;
 import org.apache.doris.qe.ConnectContext;
@@ -80,7 +80,8 @@ public class CloudEnv extends Env {
     public CloudEnv(boolean isCheckpointCatalog) {
         super(isCheckpointCatalog);
         this.cleanCopyJobScheduler = new CleanCopyJobScheduler();
-        this.loadManager = 
EnvFactory.getInstance().createLoadManager(loadJobScheduler, 
cleanCopyJobScheduler);
+        this.loadManager = ((CloudEnvFactory) EnvFactory.getInstance())
+                                    .createLoadManager(loadJobScheduler, 
cleanCopyJobScheduler);
         this.cloudClusterCheck = new 
CloudClusterChecker((CloudSystemInfoService) systemInfo);
         this.cloudInstanceStatusChecker = new 
CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo);
         this.cloudTabletRebalancer = new 
CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
index 5d05928b540..32992307a8b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnvFactory.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.cloud.common.util.CloudPropertyAnalyzer;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CleanCopyJobScheduler;
 import org.apache.doris.cloud.load.CloudBrokerLoadJob;
 import org.apache.doris.cloud.load.CloudLoadManager;
 import org.apache.doris.cloud.load.CloudRoutineLoadManager;
@@ -178,9 +179,9 @@ public class CloudEnvFactory extends EnvFactory {
         return new CloudRoutineLoadManager();
     }
 
-    @Override
-    public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler) {
-        return new CloudLoadManager(loadJobScheduler);
+    public LoadManager createLoadManager(LoadJobScheduler loadJobScheduler,
+                                        CleanCopyJobScheduler 
cleanCopyJobScheduler) {
+        return new CloudLoadManager(loadJobScheduler, cleanCopyJobScheduler);
     }
 
     public MasterDaemon createTabletStatMgr() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
similarity index 98%
rename from 
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
index 758c899e082..610d99716d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobScheduler.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.MasterDaemon;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
similarity index 98%
rename from 
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
index 245c4e19f26..3b6bd75d3b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CleanCopyJobTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CleanCopyJobTask.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
index 814e6020362..04560df0fd8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.loadv2.BrokerLoadJob;
@@ -62,14 +63,29 @@ public class CloudBrokerLoadJob extends BrokerLoadJob {
     protected static final String CLOUD_CLUSTER_ID = "clusterId";
     protected String cloudClusterId;
 
-    private int retryTimes = 3;
+    protected int retryTimes = 3;
 
     public CloudBrokerLoadJob() {
     }
 
+    public CloudBrokerLoadJob(EtlJobType type) {
+        super(type);
+    }
+
+    public CloudBrokerLoadJob(EtlJobType type, long dbId, String label, 
BrokerDesc brokerDesc,
+            OriginStatement originStmt, UserIdentity userInfo)
+            throws MetaNotFoundException {
+        super(type, dbId, label, brokerDesc, originStmt, userInfo);
+        setCloudClusterId();
+    }
+
     public CloudBrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, 
OriginStatement originStmt,
             UserIdentity userInfo) throws MetaNotFoundException {
         super(dbId, label, brokerDesc, originStmt, userInfo);
+        setCloudClusterId();
+    }
+
+    private void setCloudClusterId() throws MetaNotFoundException {
         ConnectContext context = ConnectContext.get();
         if (context != null) {
             String clusterName = context.getCloudCluster();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
deleted file mode 100644
index c3302901072..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudCopyJob.java
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cloud.load;
-
-import org.apache.doris.cloud.proto.Cloud.StagePB;
-import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.load.EtlJobType;
-import org.apache.doris.load.FailMsg;
-import org.apache.doris.load.loadv2.BrokerLoadJob;
-import org.apache.doris.load.loadv2.JobState;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import lombok.Getter;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CloudCopyJob extends BrokerLoadJob {
-    private static final Logger LOG = LogManager.getLogger(CloudCopyJob.class);
-    private static final String TABLE_NAME_KEY = "TableName";
-    private static final String USER_NAME_KEY = "UserName";
-
-    @Getter
-    private String stageId;
-    @Getter
-    private StagePB.StageType stageType;
-    @Getter
-    private String stagePrefix;
-    @Getter
-    private long sizeLimit;
-    @Getter
-    private String pattern;
-    @Getter
-    private ObjectInfo objectInfo;
-    @Getter
-    private String copyId;
-    @Getter
-    private boolean forceCopy;
-    private String loadFilePaths = "";
-    private Map<String, String> properties = new HashMap<>();
-    private volatile boolean abortedCopy = false;
-    private boolean isReplay = false;
-    private List<String> loadFiles = null;
-
-    public CloudCopyJob() {
-        super(EtlJobType.COPY);
-    }
-
-    @Override
-    public void cancelJob(FailMsg failMsg) throws DdlException {
-        super.cancelJob(failMsg);
-        loadFiles = null;
-        abortedCopy = true;
-    }
-
-    public void setAbortedCopy(boolean abortedCopy) {
-        this.abortedCopy = abortedCopy;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        super.write(out);
-        Text.writeString(out, copyId);
-        Text.writeString(out, loadFilePaths);
-        Gson gson = new Gson();
-        Text.writeString(out, properties == null ? "" : 
gson.toJson(properties));
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        copyId = Text.readString(in);
-        loadFilePaths = Text.readString(in);
-        String property = Text.readString(in);
-        properties = property.isEmpty() ? new HashMap<>()
-                : (new Gson().fromJson(property, new TypeToken<Map<String, 
String>>() {
-                }.getType()));
-
-        // FIXME: COPY JOB is not supported yet.
-        state = JobState.CANCELLED;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index 5caa2108c59..f4543c6b066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -17,19 +17,52 @@
 
 package org.apache.doris.cloud.load;
 
+import org.apache.doris.analysis.CopyStmt;
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
 import org.apache.doris.common.UserException;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadJobScheduler;
 import org.apache.doris.load.loadv2.LoadManager;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class CloudLoadManager extends LoadManager {
+    private static final Logger LOG = 
LogManager.getLogger(CloudLoadManager.class);
+    private CleanCopyJobScheduler cleanCopyJobScheduler;
 
-    public CloudLoadManager(LoadJobScheduler loadJobScheduler) {
+    public CloudLoadManager(LoadJobScheduler loadJobScheduler, 
CleanCopyJobScheduler cleanCopyJobScheduler) {
         super(loadJobScheduler);
+        this.cleanCopyJobScheduler = cleanCopyJobScheduler;
     }
 
     @Override
@@ -46,5 +79,182 @@ public class CloudLoadManager extends LoadManager {
         return super.createLoadJobFromStmt(stmt);
     }
 
+    public LoadJob createLoadJobFromStmt(CopyStmt stmt) throws DdlException {
+        Database database = super.checkDb(stmt.getDbName());
+        long dbId = database.getId();
+        BrokerLoadJob loadJob = null;
+        ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStartCurrentCluster();
+
+        writeLock();
+        try {
+            long unfinishedCopyJobNum = unprotectedGetUnfinishedCopyJobNum();
+            if (unfinishedCopyJobNum >= Config.cluster_max_waiting_copy_jobs) {
+                throw new DdlException(
+                        "There are more than " + unfinishedCopyJobNum + " 
unfinished copy jobs, please retry later.");
+            }
+            loadJob = new CopyJob(dbId, stmt.getLabel().getLabelName(), 
ConnectContext.get().queryId(),
+                    stmt.getBrokerDesc(), stmt.getOrigStmt(), 
stmt.getUserInfo(), stmt.getStageId(),
+                    stmt.getStageType(), stmt.getStagePrefix(), 
stmt.getSizeLimit(), stmt.getPattern(),
+                    stmt.getObjectInfo(), stmt.isForce(), stmt.getUserName());
+            loadJob.setJobProperties(stmt.getProperties());
+            loadJob.checkAndSetDataSourceInfo(database, 
stmt.getDataDescriptions());
+            loadJob.setTimeout(ConnectContext.get().getExecTimeout());
+            createLoadJob(loadJob);
+        } catch (MetaNotFoundException e) {
+            throw new DdlException(e.getMessage());
+        } finally {
+            super.writeUnlock();
+        }
+        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
+
+        // The job must be submitted after edit log.
+        // It guarantees that load job has not been changed before edit log.
+        loadJobScheduler.submitJob(loadJob);
+        return loadJob;
+    }
+
+    public void createCleanCopyJobTask(CleanCopyJobTask task) throws 
DdlException {
+        cleanCopyJobScheduler.submitJob(task);
+    }
+
+    private long unprotectedGetUnfinishedCopyJobNum() {
+        return idToLoadJob.values().stream()
+                .filter(j -> (j.getState() != JobState.FINISHED && 
j.getState() != JobState.CANCELLED))
+                .filter(j -> j instanceof CopyJob).count();
+    }
+
+    /**
+     * This method will return the jobs info which can meet the condition of 
input param.
+     *
+     * @param dbId          used to filter jobs which belong to this db
+     * @param labelValue    used to filter jobs which's label is or like 
labelValue.
+     * @param accurateMatch true: filter jobs which's label is labelValue. 
false: filter jobs which's label like itself.
+     * @param statesValue   used to filter jobs which's state within the 
statesValue set.
+     * @param jobTypes      used to filter jobs which's type within the 
jobTypes set.
+     * @param copyIdValue        used to filter jobs which's copyId is or like 
copyIdValue.
+     * @param copyIdAccurateMatch  true: filter jobs which's copyId is 
copyIdValue.
+     *                             false: filter jobs which's copyId like 
itself.
+     * @return The result is the list of jobInfo.
+     *         JobInfo is a list which includes the comparable object: jobId, 
label, state etc.
+     *         The result is unordered.
+     */
+    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String 
labelValue, boolean accurateMatch,
+            Set<String> statesValue, Set<EtlJobType> jobTypes, String 
copyIdValue, boolean copyIdAccurateMatch,
+            String tableNameValue, boolean tableNameAccurateMatch, String 
fileValue, boolean fileAccurateMatch)
+            throws AnalysisException {
+        LinkedList<List<Comparable>> loadJobInfos = new 
LinkedList<List<Comparable>>();
+        if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+            return loadJobInfos;
+        }
+
+        if (jobTypes == null || jobTypes.isEmpty()) {
+            jobTypes = new HashSet<>();
+            jobTypes.addAll(EnumSet.allOf(EtlJobType.class));
+        }
+
+        Set<JobState> states = Sets.newHashSet();
+        if (statesValue == null || statesValue.size() == 0) {
+            states.addAll(EnumSet.allOf(JobState.class));
+        } else {
+            for (String stateValue : statesValue) {
+                try {
+                    states.add(JobState.valueOf(stateValue));
+                } catch (IllegalArgumentException e) {
+                    // ignore this state
+                }
+            }
+        }
+
+        readLock();
+        try {
+            Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(dbId);
+            List<LoadJob> loadJobList = Lists.newArrayList();
+            if (Strings.isNullOrEmpty(labelValue)) {
+                loadJobList.addAll(
+                        
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
+            } else {
+                // check label value
+                if (accurateMatch) {
+                    if (!labelToLoadJobs.containsKey(labelValue)) {
+                        return loadJobInfos;
+                    }
+                    loadJobList.addAll(labelToLoadJobs.get(labelValue));
+                } else {
+                    // non-accurate match
+                    PatternMatcher matcher =
+                            
PatternMatcherWrapper.createMysqlPattern(labelValue,
+                                    
CaseSensibility.LABEL.getCaseSensibility());
+                    for (Map.Entry<String, List<LoadJob>> entry : 
labelToLoadJobs.entrySet()) {
+                        if (matcher.match(entry.getKey())) {
+                            loadJobList.addAll(entry.getValue());
+                        }
+                    }
+                }
+            }
+
+            List<LoadJob> loadJobList2 = new ArrayList<>();
+            // check state
+            for (LoadJob loadJob : loadJobList) {
+                if (!states.contains(loadJob.getState())) {
+                    continue;
+                }
+                if (!jobTypes.contains(loadJob.getJobType())) {
+                    continue;
+                }
+                loadJobList2.add(loadJob);
+            }
+            loadJobList2 = filterCopyJob(loadJobList2, copyIdValue, 
copyIdAccurateMatch, c -> c.getCopyId());
+            loadJobList2 = filterCopyJob(loadJobList2, tableNameValue, 
tableNameAccurateMatch, c -> c.getTableName());
+            loadJobList2 = filterCopyJob(loadJobList2, fileValue, 
fileAccurateMatch, c -> c.getFiles());
+            for (LoadJob loadJob : loadJobList2) {
+                try {
+                    if (!states.contains(loadJob.getState())) {
+                        continue;
+                    }
+                    // check auth
+                    try {
+                        checkJobAuth(loadJob.getDb().getCatalog().getName(), 
loadJob.getDb().getName(),
+                                loadJob.getTableNames());
+                    } catch (AnalysisException e) {
+                        continue;
+                    }
+                    // add load job info
+                    loadJobInfos.add(loadJob.getShowInfo());
+                } catch (RuntimeException | DdlException | 
MetaNotFoundException e) {
+                    // ignore this load job
+                    LOG.warn("get load job info failed. job id: {}", 
loadJob.getId(), e);
+                }
+            }
+            return loadJobInfos;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private List<LoadJob> filterCopyJob(List<LoadJob> loadJobList, String 
value, boolean accurateMatch,
+            Function<CopyJob, String> func) throws AnalysisException {
+        if (Strings.isNullOrEmpty(value)) {
+            return loadJobList;
+        }
+        List<LoadJob> loadJobList2 = Lists.newArrayList();
+        for (LoadJob loadJob : loadJobList) {
+            if (loadJob.getJobType() != EtlJobType.COPY) {
+                continue;
+            }
+            CopyJob copyJob = (CopyJob) loadJob;
+            if (accurateMatch) {
+                if (func.apply(copyJob).equalsIgnoreCase(value)) {
+                    loadJobList2.add(copyJob);
+                }
+            } else {
+                // non-accurate match
+                PatternMatcher matcher = 
PatternMatcherWrapper.createMysqlPattern(value, false);
+                if (matcher.match(func.apply(copyJob))) {
+                    loadJobList2.add(copyJob);
+                }
+            }
+        }
+        return loadJobList2;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
similarity index 96%
rename from fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
rename to fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
index 637a182ec07..35574561a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.CopyStmt;
@@ -40,6 +40,8 @@ import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.LoadJobFinalOperation;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TUniqueId;
@@ -61,7 +63,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
-public class CopyJob extends BrokerLoadJob {
+public class CopyJob extends CloudBrokerLoadJob {
     private static final Logger LOG = LogManager.getLogger(CopyJob.class);
     private static final String TABLE_NAME_KEY = "TableName";
     private static final String USER_NAME_KEY = "UserName";
@@ -108,7 +110,7 @@ public class CopyJob extends BrokerLoadJob {
     }
 
     @Override
-    protected void checkAndSetDataSourceInfo(Database db, 
List<DataDescription> dataDescriptions) throws DdlException {
+    public void checkAndSetDataSourceInfo(Database db, List<DataDescription> 
dataDescriptions) throws DdlException {
         super.checkAndSetDataSourceInfo(db, dataDescriptions);
         // now, copy into only support one table
         for (DataDescription dataDescription : dataDescriptions) {
@@ -138,7 +140,7 @@ public class CopyJob extends BrokerLoadJob {
                     && !isForceCopy()) {
                 CleanCopyJobTask copyJobCleanTask = new 
CleanCopyJobTask(objectInfo, stageId, stageType, tableId,
                         copyId, loadFiles);
-                
Env.getCurrentEnv().getLoadManager().createCleanCopyJobTask(copyJobCleanTask);
+                ((CloudLoadManager) 
Env.getCurrentEnv().getLoadManager()).createCleanCopyJobTask(copyJobCleanTask);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
 b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
similarity index 98%
rename from 
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
index bd27c67e6e5..0ab6a9ce907 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/CopyLoadPendingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyLoadPendingTask.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.StorageBackend;
@@ -38,6 +38,8 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.loadv2.BrokerLoadPendingTask;
+import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
 import org.apache.doris.thrift.TBrokerFileStatus;
 
 import com.google.common.collect.Lists;
@@ -72,7 +74,7 @@ public class CopyLoadPendingTask extends 
BrokerLoadPendingTask {
     }
 
     @Override
-    void executeTask() throws UserException {
+    public void executeTask() throws UserException {
         super.executeTask(); // get all files and begin txn
         if (!isBeginCopyDone) {
             beginCopy((BrokerPendingTaskAttachment) attachment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
index e6cd4108ee8..8201c0176e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadProcDir.java
@@ -18,7 +18,9 @@
 package org.apache.doris.common.proc;
 
 import org.apache.doris.catalog.Database;
+import org.apache.doris.cloud.load.CloudLoadManager;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.load.loadv2.LoadManager;
 
 import com.google.common.collect.ImmutableList;
@@ -66,8 +68,13 @@ public class LoadProcDir implements ProcDirInterface {
         if (db == null) {
             loadJobInfos = loadManager.getAllLoadJobInfos();
         } else {
-            loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), null, 
false,
-                null, null, null, false, null, false, null, false);
+            if (!Config.isCloudMode()) {
+                loadJobInfos = loadManager.getLoadJobInfosByDb(db.getId(), 
null, false, null);
+            } else {
+                loadJobInfos = ((CloudLoadManager) loadManager)
+                        .getLoadJobInfosByDb(db.getId(), null, false,
+                        null, null, null, false, null, false, null, false);
+            }
         }
 
         int counter = 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 60dfce0edb6..fd70e03caa7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -58,7 +58,6 @@ import 
org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -224,16 +223,8 @@ public class BrokerLoadJob extends BulkLoadJob {
 
         UUID uuid = UUID.randomUUID();
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-        if (Config.isNotCloudMode()) {
-            task.init(loadId, attachment.getFileStatusByTable(aggKey),
-                    attachment.getFileNumByTable(aggKey), getUserInfo());
-        } else {
-            if (Strings.isNullOrEmpty(clusterId)) {
-                throw new UserException("can not get a valid cluster");
-            }
-            task.init(loadId, attachment.getFileStatusByTable(aggKey),
-                    attachment.getFileNumByTable(aggKey), getUserInfo(), 
clusterId);
-        }
+        task.init(loadId, attachment.getFileStatusByTable(aggKey),
+                attachment.getFileNumByTable(aggKey), getUserInfo());
         task.settWorkloadGroups(tWorkloadGroups);
         return task;
     }
@@ -464,87 +455,6 @@ public class BrokerLoadJob extends BulkLoadJob {
         jobProfile = null;
     }
 
-    @Override
-    public void onTaskFailed(long taskId, FailMsg failMsg) {
-        if (!Config.isCloudMode() || Strings.isNullOrEmpty(this.clusterId)) {
-            super.onTaskFailed(taskId, failMsg);
-            return;
-        }
-        try {
-            writeLock();
-            if (isTxnDone()) {
-                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                        .add("label", label)
-                        .add("transactionId", transactionId)
-                        .add("state", state)
-                        .add("error_msg", "this task will be ignored when job 
is: " + state)
-                        .build());
-                return;
-            }
-            LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
-                    .add("label", label)
-                    .add("transactionId", transactionId)
-                    .add("state", state)
-                    .add("retryTimes", retryTimes)
-                    .add("failMsg", failMsg.getMsg())
-                    .build());
-
-            this.retryTimes--;
-            if (this.retryTimes <= 0) {
-                boolean abortTxn = this.transactionId > 0 ? true : false;
-                unprotectedExecuteCancel(failMsg, abortTxn);
-                logFinalOperation();
-                return;
-            } else {
-                unprotectedExecuteRetry(failMsg);
-            }
-        } finally {
-            writeUnlock();
-        }
-
-        boolean allTaskDone = false;
-        while (!allTaskDone) {
-            try {
-                writeLock();
-                // check if all task has been done
-                // unprotectedExecuteRetry() will cancel all running task
-                allTaskDone = true;
-                for (Map.Entry<Long, LoadTask> entry : idToTasks.entrySet()) {
-                    if (entry.getKey() != taskId && 
!entry.getValue().isDone()) {
-                        LOG.info("LoadTask({}) has not been done", 
entry.getKey());
-                        allTaskDone = false;
-                    }
-                }
-            } finally {
-                writeUnlock();
-            }
-            if (!allTaskDone) {
-                try {
-                    Thread.sleep(1000);
-                    continue;
-                } catch (InterruptedException e) {
-                    LOG.warn("", e);
-                }
-            }
-        }
-
-        try {
-            writeLock();
-            this.state = JobState.PENDING;
-            this.idToTasks.clear();
-            this.failMsg = null;
-            this.finishedTaskIds.clear();
-            
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
-            LoadTask task = createPendingTask();
-            // retry default backoff 60 seconds, because `be restart` is slow
-            task.setStartTimeMs(System.currentTimeMillis() + 60 * 1000);
-            idToTasks.put(task.getSignature(), task);
-            Env.getCurrentEnv().getPendingLoadTaskScheduler().submit(task);
-        } finally {
-            writeUnlock();
-        }
-    }
-
     @Override
     public String getResourceName() {
         StorageBackend.StorageType storageType = brokerDesc.getStorageType();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
index e1303f3c7a4..e41cd4aee2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
@@ -55,7 +55,7 @@ public class BrokerLoadPendingTask extends LoadTask {
     }
 
     @Override
-    void executeTask() throws UserException {
+    public void executeTask() throws UserException {
         LOG.info("begin to execute broker pending task. job: {}", 
callback.getCallbackId());
         getAllFileStatus();
         ((BrokerLoadJob) callback).beginTxn();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index d9b1b1fbcd8..72b3a96c4eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -32,8 +32,6 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MetaNotFoundException;
@@ -97,12 +95,6 @@ public abstract class BulkLoadJob extends LoadJob {
     // we persist these sessionVariables due to the session is not available 
when replaying the job.
     protected Map<String, String> sessionVariables = Maps.newHashMap();
 
-    private static final String CLUSTER_ID = "clusterId";
-    protected String clusterId;
-
-    // retry 3 times is enough
-    protected int retryTimes = 3;
-
     public BulkLoadJob(EtlJobType jobType) {
         super(jobType);
     }
@@ -114,36 +106,9 @@ public abstract class BulkLoadJob extends LoadJob {
         this.authorizationInfo = gatherAuthInfo();
         this.userInfo = userInfo;
 
-        if (Config.isCloudMode()) {
-            ConnectContext context = ConnectContext.get();
-            if (context != null) {
-                String clusterName = context.getCloudCluster();
-                if (Strings.isNullOrEmpty(clusterName)) {
-                    LOG.warn("cluster name is empty");
-                    throw new MetaNotFoundException("cluster name is empty");
-                }
-
-                this.clusterId = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                                    .getCloudClusterIdByName(clusterName);
-                if 
(!Strings.isNullOrEmpty(context.getSessionVariable().getCloudCluster())) {
-                    clusterName = 
context.getSessionVariable().getCloudCluster();
-                    this.clusterId =
-                            ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                            .getCloudClusterIdByName(clusterName);
-                }
-                if (Strings.isNullOrEmpty(this.clusterId)) {
-                    LOG.warn("cluster id is empty, cluster name {}", 
clusterName);
-                    throw new MetaNotFoundException("cluster id is empty, 
cluster name: " + clusterName);
-                }
-            }
-        }
-
         if (ConnectContext.get() != null) {
             SessionVariable var = ConnectContext.get().getSessionVariable();
             sessionVariables.put(SessionVariable.SQL_MODE, 
Long.toString(var.getSqlMode()));
-            if (Config.isCloudMode()) {
-                sessionVariables.put(CLUSTER_ID, clusterId);
-            }
         } else {
             sessionVariables.put(SessionVariable.SQL_MODE, 
String.valueOf(SqlModeHelper.MODE_DEFAULT));
         }
@@ -184,7 +149,7 @@ public abstract class BulkLoadJob extends LoadJob {
         }
     }
 
-    protected void checkAndSetDataSourceInfo(Database db, 
List<DataDescription> dataDescriptions) throws DdlException {
+    public void checkAndSetDataSourceInfo(Database db, List<DataDescription> 
dataDescriptions) throws DdlException {
         // check data source info
         db.readLock();
         try {
@@ -305,9 +270,6 @@ public abstract class BulkLoadJob extends LoadJob {
         fileGroupAggInfo = new BrokerFileGroupAggInfo();
         SqlParser parser = new SqlParser(new SqlScanner(new 
StringReader(originStmt.originStmt),
                 Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
-        if (Config.isCloudMode()) {
-            clusterId = sessionVariables.get("clusterId");
-        }
         try {
             Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
             analyzeStmt(SqlParserUtils.getStmt(parser, originStmt.idx), db);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 16363d72c03..65befadb13a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.cloud.load.CopyJob;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -248,7 +249,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      *
      * @return
      */
-    abstract Set<String> getTableNames() throws MetaNotFoundException;
+    public abstract Set<String> getTableNames() throws MetaNotFoundException;
 
     // return true if the corresponding transaction is done(COMMITTED, 
FINISHED, CANCELLED, RETRY)
     public boolean isTxnDone() {
@@ -1156,7 +1157,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         return (long) jobProperties.get(LoadStmt.TIMEOUT_PROPERTY);
     }
 
-    protected void setTimeout(long timeout) {
+    public void setTimeout(long timeout) {
         jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 543c3ac7805..af83fead692 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -20,10 +20,8 @@ package org.apache.doris.load.loadv2;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Status;
@@ -34,7 +32,6 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.FailMsg;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TBrokerFileStatus;
@@ -44,7 +41,6 @@ import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
 
-import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -84,7 +80,6 @@ public class LoadLoadingTask extends LoadTask {
 
     private Profile jobProfile;
     private long beginTime;
-    private String clusterId;
 
     private List<TPipelineWorkloadGroup> tWorkloadGroups = null;
 
@@ -127,43 +122,6 @@ public class LoadLoadingTask extends LoadTask {
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
-    public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList,
-            int fileNum, UserIdentity userInfo, String clusterId) throws 
UserException {
-        this.loadId = loadId;
-        planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, 
db.getId(), table, brokerDesc, fileGroups,
-                strictMode, isPartialUpdate, timezone, this.timeoutS, 
this.loadParallelism, this.sendBatchParallelism,
-                this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, 
enableMemTableOnSinkNode);
-        boolean needCleanCtx = false;
-        try {
-            if (Config.isCloudMode()) {
-                String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                                        .getClusterNameByClusterId(clusterId);
-                if (Strings.isNullOrEmpty(clusterName)) {
-                    String errMsg = "cluster name is empty, cluster id is " + 
clusterId;
-                    LOG.warn(errMsg);
-                    throw new UserException(errMsg);
-                }
-
-                if (ConnectContext.get() == null) {
-                    ConnectContext ctx = new ConnectContext();
-                    ctx.setThreadLocalInfo();
-                    ctx.setCloudCluster(clusterName);
-                    needCleanCtx = true;
-                } else {
-                    ConnectContext.get().setCloudCluster(clusterName);
-                }
-            }
-            planner.plan(loadId, fileStatusList, fileNum);
-            this.clusterId = clusterId;
-        } catch (Exception e) {
-            throw e;
-        } finally {
-            if (Config.isCloudMode() && needCleanCtx) {
-                ConnectContext.remove();
-            }
-        }
-    }
-
     public TUniqueId getLoadId() {
         return loadId;
     }
@@ -172,23 +130,6 @@ public class LoadLoadingTask extends LoadTask {
     protected void executeTask() throws Exception {
         LOG.info("begin to execute loading task. load id: {} job id: {}. db: 
{}, tbl: {}. left retry: {}",
                 DebugUtil.printId(loadId), callback.getCallbackId(), 
db.getFullName(), table.getName(), retryTime);
-        boolean needCleanCtx = false;
-        if (Config.isCloudMode()) {
-            String clusterName = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
-                                    .getClusterNameByClusterId(this.clusterId);
-            if (Strings.isNullOrEmpty(clusterName)) {
-                throw new Exception("cluster is empty");
-            }
-
-            if (ConnectContext.get() == null) {
-                ConnectContext ctx = new ConnectContext();
-                ctx.setThreadLocalInfo();
-                ctx.setCloudCluster(clusterName);
-                needCleanCtx = true;
-            } else {
-                ConnectContext.get().setCloudCluster(clusterName);
-            }
-        }
 
         retryTime--;
         beginTime = System.currentTimeMillis();
@@ -197,10 +138,6 @@ public class LoadLoadingTask extends LoadTask {
             return;
         }
         executeOnce();
-
-        if (Config.isCloudMode() && needCleanCtx) {
-            ConnectContext.remove();
-        }
     }
 
     protected void executeOnce() throws Exception {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index d76097fb7ca..ecde827b9ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -21,13 +21,11 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CompoundPredicate.Operator;
-import org.apache.doris.analysis.CopyStmt;
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
@@ -78,7 +76,6 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -87,7 +84,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -101,10 +97,9 @@ import java.util.stream.Collectors;
 public class LoadManager implements Writable {
     private static final Logger LOG = LogManager.getLogger(LoadManager.class);
 
-    private Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
-    private Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = 
Maps.newConcurrentMap();
-    private LoadJobScheduler loadJobScheduler;
-    private CleanCopyJobScheduler cleanCopyJobScheduler;
+    protected Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
+    protected Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = 
Maps.newConcurrentMap();
+    protected LoadJobScheduler loadJobScheduler;
 
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private MysqlLoadManager mysqlLoadManager;
@@ -116,11 +111,6 @@ public class LoadManager implements Writable {
         this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
     }
 
-    public LoadManager(LoadJobScheduler loadJobScheduler, 
CleanCopyJobScheduler cleanCopyJobScheduler) {
-        this(loadJobScheduler);
-        this.cleanCopyJobScheduler = cleanCopyJobScheduler;
-    }
-
     public void start() {
         tokenManager.start();
         mysqlLoadManager.start();
@@ -172,62 +162,11 @@ public class LoadManager implements Writable {
         return loadJob.getId();
     }
 
-    public LoadJob createLoadJobFromStmt(CopyStmt stmt) throws DdlException {
-        Database database = checkDb(stmt.getDbName());
-        long dbId = database.getId();
-        BrokerLoadJob loadJob = null;
-        if (Config.isCloudMode()) {
-            ConnectContext context = ConnectContext.get();
-            if (context != null) {
-                String cloudCluster = context.getCloudCluster();
-                if (!Strings.isNullOrEmpty(cloudCluster)) {
-                    ((CloudSystemInfoService) 
Env.getCurrentSystemInfo()).waitForAutoStart(cloudCluster);
-                }
-            }
-        }
-        writeLock();
-        try {
-            long unfinishedCopyJobNum = unprotectedGetUnfinishedCopyJobNum();
-            if (unfinishedCopyJobNum >= Config.cluster_max_waiting_copy_jobs) {
-                throw new DdlException(
-                        "There are more than " + unfinishedCopyJobNum + " 
unfinished copy jobs, please retry later.");
-            }
-            loadJob = new CopyJob(dbId, stmt.getLabel().getLabelName(), 
ConnectContext.get().queryId(),
-                    stmt.getBrokerDesc(), stmt.getOrigStmt(), 
stmt.getUserInfo(), stmt.getStageId(),
-                    stmt.getStageType(), stmt.getStagePrefix(), 
stmt.getSizeLimit(), stmt.getPattern(),
-                    stmt.getObjectInfo(), stmt.isForce(), stmt.getUserName());
-            loadJob.setJobProperties(stmt.getProperties());
-            loadJob.checkAndSetDataSourceInfo(database, 
stmt.getDataDescriptions());
-            loadJob.setTimeout(ConnectContext.get().getExecTimeout());
-            createLoadJob(loadJob);
-        } catch (MetaNotFoundException e) {
-            throw new DdlException(e.getMessage());
-        } finally {
-            writeUnlock();
-        }
-        Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
-
-        // The job must be submitted after edit log.
-        // It guarantees that load job has not been changed before edit log.
-        loadJobScheduler.submitJob(loadJob);
-        return loadJob;
-    }
-
-    public void createCleanCopyJobTask(CleanCopyJobTask task) throws 
DdlException {
-        cleanCopyJobScheduler.submitJob(task);
-    }
-
     private long unprotectedGetUnfinishedJobNum() {
         return idToLoadJob.values().stream()
                 .filter(j -> (j.getState() != JobState.FINISHED && 
j.getState() != JobState.CANCELLED)).count();
     }
 
-    private long unprotectedGetUnfinishedCopyJobNum() {
-        return idToLoadJob.values().stream()
-                .filter(j -> (j.getState() != JobState.FINISHED && 
j.getState() != JobState.CANCELLED))
-                .filter(j -> j instanceof CopyJob).count();
-    }
-
     /**
      * MultiLoadMgr use.
      **/
@@ -257,7 +196,7 @@ public class LoadManager implements Writable {
     }
 
     // add load job and also add to callback factory
-    private void createLoadJob(LoadJob loadJob) {
+    protected void createLoadJob(LoadJob loadJob) {
         if (loadJob.isExpired(System.currentTimeMillis())) {
             // This can happen in replay logic.
             return;
@@ -631,28 +570,17 @@ public class LoadManager implements Writable {
      * @param labelValue    used to filter jobs which's label is or like 
labelValue.
      * @param accurateMatch true: filter jobs which's label is labelValue. 
false: filter jobs which's label like itself.
      * @param statesValue   used to filter jobs which's state within the 
statesValue set.
-     * @param jobTypes      used to filter jobs which's type within the 
jobTypes set.
-     * @param copyIdValue        used to filter jobs which's copyId is or like 
copyIdValue.
-     * @param copyIdAccurateMatch  true: filter jobs which's copyId is 
copyIdValue.
-     *                             false: filter jobs which's copyId like 
itself.
      * @return The result is the list of jobInfo.
      *         JobInfo is a list which includes the comparable object: jobId, 
label, state etc.
      *         The result is unordered.
      */
     public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String 
labelValue, boolean accurateMatch,
-            Set<String> statesValue, Set<EtlJobType> jobTypes, String 
copyIdValue, boolean copyIdAccurateMatch,
-            String tableNameValue, boolean tableNameAccurateMatch, String 
fileValue, boolean fileAccurateMatch)
-            throws AnalysisException {
+                                                      Set<String> statesValue) 
throws AnalysisException {
         LinkedList<List<Comparable>> loadJobInfos = new 
LinkedList<List<Comparable>>();
         if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
             return loadJobInfos;
         }
 
-        if (jobTypes == null || jobTypes.isEmpty()) {
-            jobTypes = new HashSet<>();
-            jobTypes.addAll(EnumSet.allOf(EtlJobType.class));
-        }
-
         Set<JobState> states = Sets.newHashSet();
         if (statesValue == null || statesValue.size() == 0) {
             states.addAll(EnumSet.allOf(JobState.class));
@@ -693,21 +621,8 @@ public class LoadManager implements Writable {
                 }
             }
 
-            List<LoadJob> loadJobList2 = new ArrayList<>();
             // check state
             for (LoadJob loadJob : loadJobList) {
-                if (!states.contains(loadJob.getState())) {
-                    continue;
-                }
-                if (!jobTypes.contains(loadJob.jobType)) {
-                    continue;
-                }
-                loadJobList2.add(loadJob);
-            }
-            loadJobList2 = filterCopyJob(loadJobList2, copyIdValue, 
copyIdAccurateMatch, c -> c.getCopyId());
-            loadJobList2 = filterCopyJob(loadJobList2, tableNameValue, 
tableNameAccurateMatch, c -> c.getTableName());
-            loadJobList2 = filterCopyJob(loadJobList2, fileValue, 
fileAccurateMatch, c -> c.getFiles());
-            for (LoadJob loadJob : loadJobList2) {
                 try {
                     if (!states.contains(loadJob.getState())) {
                         continue;
@@ -732,32 +647,6 @@ public class LoadManager implements Writable {
         }
     }
 
-    private List<LoadJob> filterCopyJob(List<LoadJob> loadJobList, String 
value, boolean accurateMatch,
-            Function<CopyJob, String> func) throws AnalysisException {
-        if (Strings.isNullOrEmpty(value)) {
-            return loadJobList;
-        }
-        List<LoadJob> loadJobList2 = Lists.newArrayList();
-        for (LoadJob loadJob : loadJobList) {
-            if (loadJob.getJobType() != EtlJobType.COPY) {
-                continue;
-            }
-            CopyJob copyJob = (CopyJob) loadJob;
-            if (accurateMatch) {
-                if (func.apply(copyJob).equalsIgnoreCase(value)) {
-                    loadJobList2.add(copyJob);
-                }
-            } else {
-                // non-accurate match
-                PatternMatcher matcher = 
PatternMatcherWrapper.createMysqlPattern(value, false);
-                if (matcher.match(func.apply(copyJob))) {
-                    loadJobList2.add(copyJob);
-                }
-            }
-        }
-        return loadJobList2;
-    }
-
     public void checkJobAuth(String ctlName, String dbName, Set<String> 
tableNames) throws AnalysisException {
         if (tableNames.isEmpty()) {
             if (!Env.getCurrentEnv().getAccessManager()
@@ -875,7 +764,7 @@ public class LoadManager implements Writable {
         }
     }
 
-    private Database checkDb(String dbName) throws DdlException {
+    protected Database checkDb(String dbName) throws DdlException {
         return Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
     }
 
@@ -993,19 +882,19 @@ public class LoadManager implements Writable {
                 counter, dbId, label, isReplay);
     }
 
-    private void readLock() {
+    protected void readLock() {
         lock.readLock().lock();
     }
 
-    private void readUnlock() {
+    protected void readUnlock() {
         lock.readLock().unlock();
     }
 
-    private void writeLock() {
+    protected void writeLock() {
         lock.writeLock().lock();
     }
 
-    private void writeUnlock() {
+    protected void writeUnlock() {
         lock.writeLock().unlock();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 49735521044..bdc662dccec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -131,12 +131,13 @@ import 
org.apache.doris.analysis.UnsetDefaultStorageVaultStmt;
 import org.apache.doris.catalog.EncryptKeyHelper;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.load.CloudLoadManager;
+import org.apache.doris.cloud.load.CopyJob;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
-import org.apache.doris.load.loadv2.CopyJob;
 import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.sync.SyncJobManager;
@@ -445,7 +446,7 @@ public class DdlExecutor {
     }
 
     private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws 
Exception {
-        CopyJob job = (CopyJob) 
env.getLoadManager().createLoadJobFromStmt(copyStmt);
+        CopyJob job = (CopyJob) (((CloudLoadManager) 
env.getLoadManager()).createLoadJobFromStmt(copyStmt));
         if (!copyStmt.isAsync()) {
             // wait for execute finished
             waitJobCompleted(job);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index e6fbef29889..8bcd7ab15f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -152,6 +152,7 @@ import org.apache.doris.catalog.View;
 import org.apache.doris.clone.DynamicPartitionScheduler;
 import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
+import org.apache.doris.cloud.load.CloudLoadManager;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
@@ -1331,11 +1332,17 @@ public class ShowExecutor {
         Set<String> statesValue = showStmt.getStates() == null ? null : 
showStmt.getStates().stream()
                 .map(entity -> entity.name())
                 .collect(Collectors.toSet());
-        loadInfos.addAll(env.getLoadManager().getLoadJobInfosByDb(dbId, 
showStmt.getLabelValue(),
+        if (!Config.isCloudMode()) {
+            loadInfos.addAll(env.getLoadManager()
+                    .getLoadJobInfosByDb(dbId, showStmt.getLabelValue(), 
showStmt.isAccurateMatch(), statesValue));
+        } else {
+            loadInfos.addAll(((CloudLoadManager) env.getLoadManager())
+                    .getLoadJobInfosByDb(dbId, showStmt.getLabelValue(),
                         showStmt.isAccurateMatch(), statesValue, jobTypes, 
showStmt.getCopyIdValue(),
                         showStmt.isCopyIdAccurateMatch(), 
showStmt.getTableNameValue(),
                         showStmt.isTableNameAccurateMatch(),
                         showStmt.getFileValue(), 
showStmt.isFileAccurateMatch()));
+        }
         // add the nerieds load info
         JobManager loadMgr = env.getJobManager();
         loadInfos.addAll(loadMgr.getLoadJobInfosByDb(dbId, db.getFullName(), 
showStmt.getLabelValue(),
@@ -1526,9 +1533,17 @@ public class ShowExecutor {
             throws AnalysisException {
         LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
         if (showWarningsStmt.isFindByLabel()) {
-            List<List<Comparable>> loadJobInfosByDb = 
loadManager.getLoadJobInfosByDb(db.getId(),
-                    showWarningsStmt.getLabel(),
-                    true, null, null, null, false, null, false, null, false);
+            List<List<Comparable>> loadJobInfosByDb;
+            if (!Config.isCloudMode()) {
+                loadJobInfosByDb = loadManager.getLoadJobInfosByDb(db.getId(),
+                        showWarningsStmt.getLabel(),
+                        true, null);
+            } else {
+                loadJobInfosByDb = ((CloudLoadManager) loadManager)
+                        .getLoadJobInfosByDb(db.getId(),
+                        showWarningsStmt.getLabel(),
+                        true, null, null, null, false, null, false, null, 
false);
+            }
             if (CollectionUtils.isEmpty(loadJobInfosByDb)) {
                 return null;
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
similarity index 98%
rename from 
fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java
rename to fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
index a2a880ba847..ccaeecd4a8f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyJobTest.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.UserIdentity;
@@ -31,6 +31,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.thrift.TUniqueId;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
similarity index 99%
rename from 
fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
rename to 
fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
index 1beab75f8e8..577d485f4e7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/CopyLoadPendingTaskTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/load/CopyLoadPendingTaskTest.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.load.loadv2;
+package org.apache.doris.cloud.load;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to