This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a63fa88  [Spark load][Fe 6/6] Fe process etl and loading state job 
(#3717)
a63fa88 is described below

commit a63fa882947345b7b97643f6346ff9360e210c37
Author: wyb <[email protected]>
AuthorDate: Sun Jun 21 22:17:03 2020 +0800

    [Spark load][Fe 6/6] Fe process etl and loading state job (#3717)
    
    1. Fe checks the status of etl job regularly
    1.1 If status is RUNNING, update etl job progress
    1.2 If status is CANCELLED, cancel load job
    1.3 If status is FINISHED, get the etl output file paths, update job state 
to LOADING and log job update info
    
    2. Fe sends PushTask to Be and commits transaction after all push tasks 
execute successfully
    
    #3433
---
 .../java/org/apache/doris/catalog/Catalog.java     |   8 +
 .../apache/doris/common/DataQualityException.java  |  33 ++
 .../main/java/org/apache/doris/load/EtlStatus.java |  52 +-
 .../apache/doris/load/loadv2/LoadEtlChecker.java   |  47 ++
 .../doris/load/loadv2/LoadLoadingChecker.java      |  47 ++
 .../org/apache/doris/load/loadv2/LoadManager.java  |  41 ++
 .../doris/load/loadv2/SparkEtlJobHandler.java      | 172 ++++++-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java | 536 ++++++++++++++++++++-
 .../java/org/apache/doris/master/MasterImpl.java   |  17 +
 .../main/java/org/apache/doris/task/PushTask.java  |  25 +
 .../doris/transaction/TxnCommitAttachment.java     |   3 +
 .../doris/load/loadv2/SparkEtlJobHandlerTest.java  | 285 +++++++++++
 .../apache/doris/load/loadv2/SparkLoadJobTest.java | 490 +++++++++++++++++++
 13 files changed, 1746 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 3f149e1..2754f10 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -146,7 +146,9 @@ import org.apache.doris.load.LoadChecker;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.LoadJob.JobState;
+import org.apache.doris.load.loadv2.LoadEtlChecker;
 import org.apache.doris.load.loadv2.LoadJobScheduler;
+import org.apache.doris.load.loadv2.LoadLoadingChecker;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.loadv2.LoadTimeoutChecker;
 import org.apache.doris.load.routineload.RoutineLoadManager;
@@ -383,6 +385,8 @@ public class Catalog {
     private LoadJobScheduler loadJobScheduler;
 
     private LoadTimeoutChecker loadTimeoutChecker;
+    private LoadEtlChecker loadEtlChecker;
+    private LoadLoadingChecker loadLoadingChecker;
 
     private RoutineLoadScheduler routineLoadScheduler;
 
@@ -519,6 +523,8 @@ public class Catalog {
         this.loadJobScheduler = new LoadJobScheduler();
         this.loadManager = new LoadManager(loadJobScheduler);
         this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
+        this.loadEtlChecker = new LoadEtlChecker(loadManager);
+        this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
         this.routineLoadScheduler = new 
RoutineLoadScheduler(routineLoadManager);
         this.routineLoadTaskScheduler = new 
RoutineLoadTaskScheduler(routineLoadManager);
 
@@ -1224,6 +1230,8 @@ public class Catalog {
         loadManager.prepareJobs();
         loadJobScheduler.start();
         loadTimeoutChecker.start();
+        loadEtlChecker.start();
+        loadLoadingChecker.start();
         // Export checker
         ExportChecker.init(Config.export_checker_interval_second * 1000L);
         ExportChecker.startAll();
diff --git a/fe/src/main/java/org/apache/doris/common/DataQualityException.java 
b/fe/src/main/java/org/apache/doris/common/DataQualityException.java
new file mode 100644
index 0000000..02f9d9d
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/common/DataQualityException.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+/**
+ * Exception for data quality check failed
+ */
+public class DataQualityException extends LoadException {
+    public static final String QUALITY_FAIL_MSG = "quality not good enough to 
cancel";
+
+    public DataQualityException(String msg) {
+        super(msg);
+    }
+
+    public DataQualityException(String msg, Throwable e) {
+        super(msg, e);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/EtlStatus.java 
b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
index cdb3552..bafebc4 100644
--- a/fe/src/main/java/org/apache/doris/load/EtlStatus.java
+++ b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
@@ -20,6 +20,7 @@ package org.apache.doris.load;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+//import org.apache.doris.load.loadv2.dpp.DppResult;
 import org.apache.doris.thrift.TEtlState;
 
 import com.google.common.base.Strings;
@@ -41,12 +42,21 @@ public class EtlStatus implements Writable {
     // not persist
     private Map<String, Long> fileMap;
 
+    // for spark not persist
+    // 0 - 100
+    private int progress;
+    private String failMsg;
+    //private DppResult dppResult;
+
     public EtlStatus() {
         this.state = TEtlState.RUNNING;
         this.trackingUrl = DEFAULT_TRACKING_URL;
         this.stats = Maps.newHashMap();
         this.counters = Maps.newHashMap();
         this.fileMap = Maps.newHashMap();
+        this.progress = 0;
+        this.failMsg = "";
+        //this.dppResult = null;
     }
 
     public TEtlState getState() {
@@ -102,16 +112,54 @@ public class EtlStatus implements Writable {
         this.fileMap.putAll(fileMap);
     }
 
+    public int getProgress() {
+        return progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    public String getFailMsg() {
+        return failMsg;
+    }
+
+    public void setFailMsg(String failMsg) {
+        this.failMsg = failMsg;
+    }
+
+    // TODO(wyb): spark-load
+    /*
+    public DppResult getDppResult() {
+        return dppResult;
+    }
+
+    public void setDppResult(DppResult dppResult) {
+        this.dppResult = dppResult;
+    }
+    */
+
     public void reset() {
         this.stats.clear();
         this.counters.clear();
         this.fileMap.clear();
+        this.progress = 0;
+        this.failMsg = "";
+        //this.dppResult = null;
     }
 
     @Override
     public String toString() {
-        return "EtlTaskStatus [state=" + state + ", trackingUrl=" + 
trackingUrl + ", stats=" + stats + ", counters="
-                + counters + "]";
+        return "EtlStatus{" +
+                "state=" + state +
+                ", trackingUrl='" + trackingUrl + '\'' +
+                ", stats=" + stats +
+                ", counters=" + counters +
+                ", fileMap=" + fileMap +
+                ", progress=" + progress +
+                ", failMsg='" + failMsg + '\'' +
+                //", dppResult='" + dppResult + '\'' +
+                '}';
     }
 
     public void write(DataOutput out) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadEtlChecker.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadEtlChecker.java
new file mode 100644
index 0000000..c72896c
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadEtlChecker.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * LoadEtlChecker will update etl status for jobs that have etl state.
+ * Now only for SparkLoadJob
+ */
+public class LoadEtlChecker extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(LoadEtlChecker.class);
+
+    private LoadManager loadManager;
+
+    public LoadEtlChecker(LoadManager loadManager) {
+        super("Load etl checker", Config.load_checker_interval_second * 1000);
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            loadManager.processEtlStateJobs();
+        } catch (Throwable e) {
+            LOG.warn("Failed to process one round of LoadEtlChecker with error 
message {}", e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingChecker.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingChecker.java
new file mode 100644
index 0000000..e9d8683
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingChecker.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * LoadLoadingChecker will update loading status for jobs that loading by 
PushTask.
+ * Now only for SparkLoadJob
+ */
+public class LoadLoadingChecker extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(LoadLoadingChecker.class);
+
+    private LoadManager loadManager;
+
+    public LoadLoadingChecker(LoadManager loadManager) {
+        super("Load loading checker", Config.load_checker_interval_second * 
1000);
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        try {
+            loadManager.processLoadingStateJobs();
+        } catch (Throwable e) {
+            LOG.warn("Failed to process one round of LoadLoadingChecker with 
error message {}", e.getMessage(), e);
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 5dd2d05..c3f25e1 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.load.loadv2;
 
+import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
 import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
+import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
 
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.LoadStmt;
@@ -26,6 +28,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
@@ -392,6 +395,40 @@ public class LoadManager implements Writable{
         idToLoadJob.values().stream().forEach(entity -> 
entity.processTimeout());
     }
 
+    // only for those jobs which have etl state, like SparkLoadJob
+    public void processEtlStateJobs() {
+        idToLoadJob.values().stream().filter(job -> (job.jobType == 
EtlJobType.SPARK && job.state == JobState.ETL))
+                .forEach(job -> {
+                    try {
+                        ((SparkLoadJob) job).updateEtlStatus();
+                    } catch (DataQualityException e) {
+                        LOG.info("update load job etl status failed. job id: 
{}", job.getId(), e);
+                        job.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+                                                  true, true);
+                    } catch (UserException e) {
+                        LOG.warn("update load job etl status failed. job id: 
{}", job.getId(), e);
+                        job.cancelJobWithoutCheck(new FailMsg(ETL_RUN_FAIL, 
e.getMessage()), true, true);
+                    } catch (Exception e) {
+                        LOG.warn("update load job etl status failed. job id: 
{}", job.getId(), e);
+                    }
+                });
+    }
+
+    // only for those jobs which load by PushTask
+    public void processLoadingStateJobs() {
+        idToLoadJob.values().stream().filter(job -> (job.jobType == 
EtlJobType.SPARK && job.state == JobState.LOADING))
+                .forEach(job -> {
+                    try {
+                        ((SparkLoadJob) job).updateLoadingStatus();
+                    } catch (UserException e) {
+                        LOG.warn("update load job loading status failed. job 
id: {}", job.getId(), e);
+                        job.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, 
e.getMessage()), true, true);
+                    } catch (Exception e) {
+                        LOG.warn("update load job loading status failed. job 
id: {}", job.getId(), e);
+                    }
+                });
+    }
+
     /**
      * This method will return the jobs info which can meet the condition of 
input param.
      * @param dbId used to filter jobs which belong to this db
@@ -487,6 +524,10 @@ public class LoadManager implements Writable{
         }
     }
 
+    public LoadJob getLoadJob(long jobId) {
+        return idToLoadJob.get(jobId);
+    }
+
     public void prepareJobs() {
         analyzeLoadJobs();
         submitJobs();
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index d135b23..84013c4 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -21,11 +21,24 @@ import org.apache.doris.PaloFe;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.load.EtlStatus;
+//import org.apache.doris.load.loadv2.dpp.DppResult;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TEtlState;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.launcher.SparkAppHandle;
@@ -33,8 +46,16 @@ import org.apache.spark.launcher.SparkAppHandle.Listener;
 import org.apache.spark.launcher.SparkAppHandle.State;
 import org.apache.spark.launcher.SparkLauncher;
 
+//import com.google.common.base.Strings;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+//import com.google.gson.Gson;
+//import com.google.gson.JsonSyntaxException;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -91,7 +112,7 @@ public class SparkEtlJobHandler {
         launcher.setMaster(resource.getMaster())
                 .setDeployMode(resource.getDeployMode().name().toLowerCase())
                 .setAppResource(appResourceHdfsPath)
-                               // TODO(wyb): spark-load
+                // TODO(wyb): spark-load
                 // replace with getCanonicalName later
                 //.setMainClass(SparkEtlJob.class.getCanonicalName())
                 .setMainClass(MAIN_CLASS)
@@ -148,6 +169,120 @@ public class SparkEtlJobHandler {
         attachment.setHandle(handle);
     }
 
+    public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long 
loadJobId, String etlOutputPath,
+                                     SparkResource resource, BrokerDesc 
brokerDesc) {
+        EtlStatus status = new EtlStatus();
+
+        if (resource.isYarnMaster()) {
+            // state from yarn
+            Preconditions.checkState(appId != null && !appId.isEmpty());
+            YarnClient client = startYarnClient(resource);
+            try {
+                ApplicationReport report = 
client.getApplicationReport(ConverterUtils.toApplicationId(appId));
+                LOG.info("yarn application -status {}. load job id: {}, 
result: {}", appId, loadJobId, report);
+
+                YarnApplicationState state = report.getYarnApplicationState();
+                FinalApplicationStatus faStatus = 
report.getFinalApplicationStatus();
+                status.setState(fromYarnState(state, faStatus));
+                if (status.getState() == TEtlState.CANCELLED) {
+                    if (state == YarnApplicationState.FINISHED) {
+                        status.setFailMsg("spark app state: " + 
faStatus.toString());
+                    } else {
+                        status.setFailMsg("yarn app state: " + 
state.toString());
+                    }
+                }
+                status.setTrackingUrl(report.getTrackingUrl());
+                status.setProgress((int) (report.getProgress() * 100));
+            } catch (ApplicationNotFoundException e) {
+                LOG.warn("spark app not found. spark app id: {}, load job id: 
{}", appId, loadJobId, e);
+                status.setState(TEtlState.CANCELLED);
+                status.setFailMsg(e.getMessage());
+            } catch (YarnException | IOException e) {
+                LOG.warn("yarn application status failed. spark app id: {}, 
load job id: {}", appId, loadJobId, e);
+            } finally {
+                stopYarnClient(client);
+            }
+        } else {
+            // state from handle
+            if (handle == null) {
+                status.setFailMsg("spark app handle is null");
+                status.setState(TEtlState.CANCELLED);
+                return status;
+            }
+
+            State state = handle.getState();
+            status.setState(fromSparkState(state));
+            if (status.getState() == TEtlState.CANCELLED) {
+                status.setFailMsg("spark app state: " + state.toString());
+            }
+            LOG.info("spark app id: {}, load job id: {}, app state: {}", 
appId, loadJobId, state);
+        }
+
+        if (status.getState() == TEtlState.FINISHED || status.getState() == 
TEtlState.CANCELLED) {
+            // get dpp result
+            // TODO(wyb): spark-load
+            /*
+            String dppResultFilePath = 
EtlJobConfig.getDppResultFilePath(etlOutputPath);
+            try {
+                byte[] data = BrokerUtil.readFile(dppResultFilePath, 
brokerDesc);
+                String dppResultStr = new String(data, "UTF-8");
+                DppResult dppResult = new Gson().fromJson(dppResultStr, 
DppResult.class);
+                status.setDppResult(dppResult);
+                if (status.getState() == TEtlState.CANCELLED && 
!Strings.isNullOrEmpty(dppResult.failedReason)) {
+                    status.setFailMsg(dppResult.failedReason);
+                }
+            } catch (UserException | JsonSyntaxException | 
UnsupportedEncodingException e) {
+                LOG.warn("read broker file failed. path: {}", 
dppResultFilePath, e);
+            }
+            */
+        }
+
+        return status;
+    }
+
+    public void killEtlJob(SparkAppHandle handle, String appId, long 
loadJobId, SparkResource resource) {
+        if (resource.isYarnMaster()) {
+            Preconditions.checkNotNull(appId);
+            YarnClient client = startYarnClient(resource);
+            try {
+                try {
+                    
client.killApplication(ConverterUtils.toApplicationId(appId));
+                    LOG.info("yarn application -kill {}", appId);
+                } catch (YarnException | IOException e) {
+                    LOG.warn("yarn application kill failed. app id: {}, load 
job id: {}", appId, loadJobId, e);
+                }
+            } finally {
+                stopYarnClient(client);
+            }
+        } else {
+            if (handle != null) {
+                handle.stop();
+            }
+        }
+    }
+
+    public Map<String, Long> getEtlFilePaths(String outputPath, BrokerDesc 
brokerDesc) throws Exception {
+        Map<String, Long> filePathToSize = Maps.newHashMap();
+
+        List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
+        String etlFilePaths = outputPath + "/*";
+        try {
+            BrokerUtil.parseFile(etlFilePaths, brokerDesc, fileStatuses);
+        } catch (UserException e) {
+            throw new Exception(e);
+        }
+
+        for (TBrokerFileStatus fstatus : fileStatuses) {
+            if (fstatus.isDir) {
+                continue;
+            }
+            filePathToSize.put(fstatus.getPath(), fstatus.getSize());
+        }
+        LOG.debug("get spark etl file paths. files map: {}", filePathToSize);
+
+        return filePathToSize;
+    }
+
     public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
         try {
             BrokerUtil.deletePath(outputPath, brokerDesc);
@@ -157,6 +292,41 @@ public class SparkEtlJobHandler {
         }
     }
 
+    private YarnClient startYarnClient(SparkResource resource) {
+        YarnClient client = YarnClient.createYarnClient();
+        Configuration conf = new YarnConfiguration();
+        // set yarn.resourcemanager.address
+        Pair<String, String> pair = 
resource.getYarnResourcemanagerAddressPair();
+        conf.set(pair.first, pair.second);
+        client.init(conf);
+        client.start();
+        return client;
+    }
+
+    private void stopYarnClient(YarnClient client) {
+        client.stop();
+    }
+
+    private TEtlState fromYarnState(YarnApplicationState state, 
FinalApplicationStatus faStatus) {
+        switch (state) {
+            case FINISHED:
+                if (faStatus == FinalApplicationStatus.SUCCEEDED) {
+                    // finish and success
+                    return TEtlState.FINISHED;
+                } else {
+                    // finish but fail
+                    return TEtlState.CANCELLED;
+                }
+            case FAILED:
+            case KILLED:
+                // not finish
+                return TEtlState.CANCELLED;
+            default:
+                // ACCEPTED NEW NEW_SAVING RUNNING SUBMITTED
+                return TEtlState.RUNNING;
+        }
+    }
+
     private TEtlState fromSparkState(State state) {
         switch (state) {
             case FINISHED:
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 8f74235..715b6af 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -17,29 +17,70 @@
 
 package org.apache.doris.load.loadv2;
 
+import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
+
 import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
+//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+//import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TabletQuorumFailedException;
+import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
@@ -58,6 +99,7 @@ import com.google.gson.annotations.SerializedName;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -92,7 +134,7 @@ public class SparkLoadJob extends BulkLoadJob {
     private long quorumFinishTimestamp = -1;
     // below for push task
     private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
-    //private Map<Long, PushBrokerReaderParams> indexToPushBrokerReaderParams 
= Maps.newHashMap();
+    private Map<Long, PushBrokerReaderParams> indexToPushBrokerReaderParams = 
Maps.newHashMap();
     private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
     private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = 
Maps.newHashMap();
     private Set<Long> finishedReplicas = Sets.newHashSet();
@@ -211,6 +253,377 @@ public class SparkLoadJob extends BulkLoadJob {
         LOG.info("update to {} state success. job id: {}", state, id);
     }
 
+    private boolean checkState(JobState expectState) {
+        readLock();
+        try {
+            if (state == expectState) {
+                return true;
+            }
+            return false;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * Check the status of etl job regularly
+     * 1. RUNNING, update etl job progress
+     * 2. CANCELLED, cancel load job
+     * 3. FINISHED, get the etl output file paths, update job state to LOADING 
and log job update info
+     *
+     * Send push tasks if job state changed to LOADING
+     */
+    public void updateEtlStatus() throws Exception {
+        if (!checkState(JobState.ETL)) {
+            return;
+        }
+
+        // get etl status
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        EtlStatus status = handler.getEtlJobStatus(sparkAppHandle, appId, id, 
etlOutputPath, sparkResource, brokerDesc);
+        writeLock();
+        try {
+            switch (status.getState()) {
+                case RUNNING:
+                    unprotectedUpdateEtlStatusInternal(status);
+                    break;
+                case FINISHED:
+                    unprotectedProcessEtlFinish(status, handler);
+                    break;
+                case CANCELLED:
+                    throw new LoadException("spark etl job failed. msg: " + 
status.getFailMsg());
+                default:
+                    LOG.warn("unknown etl state: {}", 
status.getState().name());
+                    break;
+            }
+        } finally {
+            writeUnlock();
+        }
+
+        if (checkState(JobState.LOADING)) {
+            // create and send push tasks
+            submitPushTasks();
+        }
+    }
+
+    private void unprotectedUpdateEtlStatusInternal(EtlStatus etlStatus) {
+        loadingStatus = etlStatus;
+        progress = etlStatus.getProgress();
+        if (!sparkResource.isYarnMaster()) {
+            loadingStatus.setTrackingUrl(appId);
+        }
+
+        // TODO(wyb): spark-load
+        /*
+        DppResult dppResult = etlStatus.getDppResult();
+        if (dppResult != null) {
+            // update load statistic and counters when spark etl job finished
+            // fe gets these infos from spark dpp, so we use dummy load id and 
dummy backend id here
+            loadStatistic.fileNum = (int) dppResult.fileNumber;
+            loadStatistic.totalFileSizeB = dppResult.fileSize;
+            TUniqueId dummyId = new TUniqueId(0, 0);
+            long dummyBackendId = -1L;
+            loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), 
Lists.newArrayList(dummyBackendId));
+            loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, 
dppResult.scannedRows, true);
+
+            Map<String, String> counters = loadingStatus.getCounters();
+            counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
+            counters.put(DPP_ABNORMAL_ALL, 
String.valueOf(dppResult.abnormalRows));
+            counters.put(UNSELECTED_ROWS, 
String.valueOf(dppResult.unselectRows));
+        }
+        */
+    }
+
+    private void unprotectedProcessEtlFinish(EtlStatus etlStatus, 
SparkEtlJobHandler handler) throws Exception {
+        unprotectedUpdateEtlStatusInternal(etlStatus);
+        // checkDataQuality
+        if (!checkDataQuality()) {
+            throw new DataQualityException(QUALITY_FAIL_MSG);
+        }
+
+        // get etl output files and update loading state
+        unprotectedUpdateToLoadingState(etlStatus, 
handler.getEtlFilePaths(etlOutputPath, brokerDesc));
+        // log loading state
+        unprotectedLogUpdateStateInfo();
+        // prepare loading infos
+        unprotectedPrepareLoadingInfos();
+    }
+
+    private void unprotectedUpdateToLoadingState(EtlStatus etlStatus, 
Map<String, Long> filePathToSize) throws LoadException {
+        try {
+            for (Map.Entry<String, Long> entry : filePathToSize.entrySet()) {
+                String filePath = entry.getKey();
+                if (!filePath.endsWith(EtlJobConfig.ETL_OUTPUT_FILE_FORMAT)) {
+                    continue;
+                }
+                String tabletMetaStr = EtlJobConfig.getTabletMetaStr(filePath);
+                tabletMetaToFileInfo.put(tabletMetaStr, Pair.create(filePath, 
entry.getValue()));
+            }
+
+            loadingStatus = etlStatus;
+            progress = 0;
+            unprotectedUpdateState(JobState.LOADING);
+            LOG.info("update to {} state success. job id: {}", state, id);
+        } catch (Exception e) {
+            LOG.warn("update to {} state failed. job id: {}", state, id, e);
+            throw new LoadException(e.getMessage(), e);
+        }
+    }
+
+    private void unprotectedPrepareLoadingInfos() {
+        for (String tabletMetaStr : tabletMetaToFileInfo.keySet()) {
+            String[] fileNameArr = tabletMetaStr.split("\\.");
+            // tableId.partitionId.indexId.bucket.schemaHash
+            Preconditions.checkState(fileNameArr.length == 5);
+            long tableId = Long.parseLong(fileNameArr[0]);
+            long partitionId = Long.parseLong(fileNameArr[1]);
+            long indexId = Long.parseLong(fileNameArr[2]);
+            int schemaHash = Integer.parseInt(fileNameArr[4]);
+
+            if (!tableToLoadPartitions.containsKey(tableId)) {
+                tableToLoadPartitions.put(tableId, Sets.newHashSet());
+            }
+            tableToLoadPartitions.get(tableId).add(partitionId);
+
+            indexToSchemaHash.put(indexId, schemaHash);
+        }
+    }
+
+    private PushBrokerReaderParams getPushBrokerReaderParams(OlapTable table, 
long indexId) throws UserException {
+        if (!indexToPushBrokerReaderParams.containsKey(indexId)) {
+            PushBrokerReaderParams pushBrokerReaderParams = new 
PushBrokerReaderParams();
+            pushBrokerReaderParams.init(table.getSchemaByIndexId(indexId), 
brokerDesc);
+            indexToPushBrokerReaderParams.put(indexId, pushBrokerReaderParams);
+        }
+        return indexToPushBrokerReaderParams.get(indexId);
+    }
+
+    private Set<Long> submitPushTasks() throws UserException {
+        // check db exist
+        Database db = null;
+        try {
+            db = getDb();
+        } catch (MetaNotFoundException e) {
+            String errMsg = new LogBuilder(LogKey.LOAD_JOB, id)
+                    .add("database_id", dbId)
+                    .add("label", label)
+                    .add("error_msg", "db has been deleted when job is 
loading")
+                    .build();
+            throw new MetaNotFoundException(errMsg);
+        }
+
+        AgentBatchTask batchTask = new AgentBatchTask();
+        boolean hasLoadPartitions = false;
+        Set<Long> totalTablets = Sets.newHashSet();
+        db.readLock();
+        try {
+            writeLock();
+            try {
+                for (Map.Entry<Long, Set<Long>> entry : 
tableToLoadPartitions.entrySet()) {
+                    long tableId = entry.getKey();
+                    OlapTable table = (OlapTable) db.getTable(tableId);
+                    if (table == null) {
+                        LOG.warn("table does not exist. id: {}", tableId);
+                        continue;
+                    }
+
+                    Set<Long> partitionIds = entry.getValue();
+                    for (long partitionId : partitionIds) {
+                        Partition partition = table.getPartition(partitionId);
+                        if (partition == null) {
+                            LOG.warn("partition does not exist. id: {}", 
partitionId);
+                            continue;
+                        }
+
+                        hasLoadPartitions = true;
+                        int quorumReplicaNum = 
table.getPartitionInfo().getReplicationNum(partitionId) / 2 + 1;
+
+                        List<MaterializedIndex> indexes = 
partition.getMaterializedIndices(IndexExtState.ALL);
+                        for (MaterializedIndex index : indexes) {
+                            long indexId = index.getId();
+                            int schemaHash = indexToSchemaHash.get(indexId);
+
+                            int bucket = 0;
+                            for (Tablet tablet : index.getTablets()) {
+                                long tabletId = tablet.getId();
+                                totalTablets.add(tabletId);
+                                String tabletMetaStr = 
String.format("%d.%d.%d.%d.%d", tableId, partitionId,
+                                                                     indexId, 
bucket++, schemaHash);
+                                Set<Long> tabletAllReplicas = 
Sets.newHashSet();
+                                Set<Long> tabletFinishedReplicas = 
Sets.newHashSet();
+                                for (Replica replica : tablet.getReplicas()) {
+                                    long replicaId = replica.getId();
+                                    tabletAllReplicas.add(replicaId);
+                                    if 
(!tabletToSentReplicaPushTask.containsKey(tabletId)
+                                            || 
!tabletToSentReplicaPushTask.get(tabletId).containsKey(replicaId)) {
+                                        long backendId = 
replica.getBackendId();
+                                        long taskSignature = 
Catalog.getCurrentGlobalTransactionMgr()
+                                                
.getTransactionIDGenerator().getNextTransactionId();
+
+                                        PushBrokerReaderParams params = 
getPushBrokerReaderParams(table, indexId);
+                                        // deep copy TBrokerScanRange because 
filePath and fileSize will be updated
+                                        // in different tablet push task
+                                        TBrokerScanRange tBrokerScanRange = 
new TBrokerScanRange(params.tBrokerScanRange);
+                                        // update filePath fileSize
+                                        TBrokerRangeDesc tBrokerRangeDesc = 
tBrokerScanRange.getRanges().get(0);
+                                        tBrokerRangeDesc.setPath("");
+                                        tBrokerRangeDesc.setFile_size(-1);
+                                        if 
(tabletMetaToFileInfo.containsKey(tabletMetaStr)) {
+                                            Pair<String, Long> fileInfo = 
tabletMetaToFileInfo.get(tabletMetaStr);
+                                            
tBrokerRangeDesc.setPath(fileInfo.first);
+                                            
tBrokerRangeDesc.setFile_size(fileInfo.second);
+                                        }
+
+                                        // update broker address
+                                        Backend backend = 
Catalog.getCurrentCatalog().getCurrentSystemInfo()
+                                                .getBackend(backendId);
+                                        FsBroker fsBroker = 
Catalog.getCurrentCatalog().getBrokerMgr().getBroker(
+                                                brokerDesc.getName(), 
backend.getHost());
+                                        
tBrokerScanRange.getBroker_addresses().add(
+                                                new 
TNetworkAddress(fsBroker.ip, fsBroker.port));
+
+                                        LOG.debug("push task for replica {}, 
broker {}:{}, backendId {}, filePath {}, fileSize {}" ,
+                                                  replicaId, fsBroker.ip, 
fsBroker.port, backendId, tBrokerRangeDesc.path,
+                                                  tBrokerRangeDesc.file_size);
+
+                                        PushTask pushTask = new 
PushTask(backendId, dbId, tableId, partitionId,
+                                                                         
indexId, tabletId, replicaId, schemaHash,
+                                                                         0, 
id, TPushType.LOAD_V2,
+                                                                         
TPriority.NORMAL, transactionId, taskSignature,
+                                                                         
tBrokerScanRange, params.tDescriptorTable);
+                                        if (AgentTaskQueue.addTask(pushTask)) {
+                                            batchTask.addTask(pushTask);
+                                            if 
(!tabletToSentReplicaPushTask.containsKey(tabletId)) {
+                                                
tabletToSentReplicaPushTask.put(tabletId, Maps.newHashMap());
+                                            }
+                                            
tabletToSentReplicaPushTask.get(tabletId).put(replicaId, pushTask);
+                                        }
+                                    }
+
+                                    if (finishedReplicas.contains(replicaId) 
&& replica.getLastFailedVersion() < 0) {
+                                        tabletFinishedReplicas.add(replicaId);
+                                    }
+                                }
+
+                                if (tabletAllReplicas.size() == 0) {
+                                    LOG.error("invalid situation. tablet is 
empty. id: {}", tabletId);
+                                }
+
+                                // check tablet push states
+                                if (tabletFinishedReplicas.size() >= 
quorumReplicaNum) {
+                                    quorumTablets.add(tabletId);
+                                    if (tabletFinishedReplicas.size() == 
tabletAllReplicas.size()) {
+                                        fullTablets.add(tabletId);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if (batchTask.getTaskNum() > 0) {
+                    AgentTaskExecutor.submit(batchTask);
+                }
+
+                if (!hasLoadPartitions) {
+                    String errMsg = new LogBuilder(LogKey.LOAD_JOB, id)
+                            .add("database_id", dbId)
+                            .add("label", label)
+                            .add("error_msg", "all partitions have no load 
data")
+                            .build();
+                    throw new LoadException(errMsg);
+                }
+
+                return totalTablets;
+            } finally {
+                writeUnlock();
+            }
+        } finally {
+            db.readUnlock();
+        }
+    }
+
+    public void addFinishedReplica(long replicaId, long tabletId, long 
backendId) {
+        writeLock();
+        try {
+            if (finishedReplicas.add(replicaId)) {
+                commitInfos.add(new TabletCommitInfo(tabletId, backendId));
+                // set replica push task null
+                Map<Long, PushTask> sentReplicaPushTask = 
tabletToSentReplicaPushTask.get(tabletId);
+                if (sentReplicaPushTask != null) {
+                    if (sentReplicaPushTask.containsKey(replicaId)) {
+                        sentReplicaPushTask.put(replicaId, null);
+                    }
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * 1. Sends push tasks to Be
+     * 2. Commit transaction after all push tasks execute successfully
+     */
+    public void updateLoadingStatus() throws UserException {
+        if (!checkState(JobState.LOADING)) {
+            return;
+        }
+
+        // submit push tasks
+        Set<Long> totalTablets = submitPushTasks();
+
+        // update status
+        boolean canCommitJob = false;
+        writeLock();
+        try {
+            // loading progress
+            // 100: txn status is visible and load has been finished
+            progress = fullTablets.size() * 100 / totalTablets.size();
+            if (progress == 100) {
+                progress = 99;
+            }
+
+            // quorum finish ts
+            if (quorumFinishTimestamp < 0 && 
quorumTablets.containsAll(totalTablets)) {
+                quorumFinishTimestamp = System.currentTimeMillis();
+            }
+
+            // if all replicas are finished or stay in quorum finished for 
long time, try to commit it.
+            long stragglerTimeout = Config.load_straggler_wait_second * 1000;
+            if ((quorumFinishTimestamp > 0 && System.currentTimeMillis() - 
quorumFinishTimestamp > stragglerTimeout)
+                    || fullTablets.containsAll(totalTablets)) {
+                canCommitJob = true;
+            }
+        } finally {
+            writeUnlock();
+        }
+
+        // try commit transaction
+        if (canCommitJob) {
+            tryCommitJob();
+        }
+    }
+
+    private void tryCommitJob() throws UserException {
+        LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+                         .add("txn_id", transactionId)
+                         .add("msg", "Load job try to commit txn")
+                         .build());
+        Database db = getDb();
+        db.writeLock();
+        try {
+            Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
+                    dbId, transactionId, commitInfos,
+                    new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
+                                              finishTimestamp, state, 
failMsg));
+        } catch (TabletQuorumFailedException e) {
+            // retry in next loop
+        } finally {
+            db.writeUnlock();
+        }
+    }
+
     /**
      * load job already cancelled or finished, clear job below:
      * 1. kill etl job and delete etl files
@@ -224,8 +637,7 @@ public class SparkLoadJob extends BulkLoadJob {
         if (state == JobState.CANCELLED) {
             if ((!Strings.isNullOrEmpty(appId) && 
sparkResource.isYarnMaster()) || sparkAppHandle != null) {
                 try {
-                    // TODO(wyb): spark-load
-                    //handler.killEtlJob(sparkAppHandle, appId, id, 
sparkResource);
+                    handler.killEtlJob(sparkAppHandle, appId, id, 
sparkResource);
                 } catch (Exception e) {
                     LOG.warn("kill etl job failed. id: {}, state: {}", id, 
state, e);
                 }
@@ -257,7 +669,7 @@ public class SparkLoadJob extends BulkLoadJob {
             sparkAppHandle = null;
             resourceDesc = null;
             tableToLoadPartitions.clear();
-            //indexToPushBrokerReaderParams.clear();
+            indexToPushBrokerReaderParams.clear();
             indexToSchemaHash.clear();
             tabletToSentReplicaPushTask.clear();
             finishedReplicas.clear();
@@ -269,6 +681,12 @@ public class SparkLoadJob extends BulkLoadJob {
     }
 
     @Override
+    public void afterVisible(TransactionState txnState, boolean txnOperated) {
+        super.afterVisible(txnState, txnOperated);
+        clearJob();
+    }
+
+    @Override
     public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, 
boolean needLog) {
         super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
         clearJob();
@@ -343,8 +761,7 @@ public class SparkLoadJob extends BulkLoadJob {
                 // nothing to do
                 break;
             case LOADING:
-                // TODO(wyb): spark-load
-                //unprotectedPrepareLoadingInfos();
+                unprotectedPrepareLoadingInfos();
                 break;
             default:
                 LOG.warn("replay update load job state info failed. error: 
wrong state. job id: {}, state: {}",
@@ -392,4 +809,109 @@ public class SparkLoadJob extends BulkLoadJob {
             return tabletMetaToFileInfo;
         }
     }
-}
+
+    /**
+     * Params for be push broker reader
+     * 1. TBrokerScanRange: file path and size, broker address, tranform expr
+     * 2. TDescriptorTable: src and dest SlotDescriptors, src and dest 
tupleDescriptors
+     *
+     * These params are sent to Be through push task
+     */
+    private static class PushBrokerReaderParams {
+        TBrokerScanRange tBrokerScanRange;
+        TDescriptorTable tDescriptorTable;
+
+        public PushBrokerReaderParams() {
+            this.tBrokerScanRange = new TBrokerScanRange();
+            this.tDescriptorTable = null;
+        }
+
+        public void init(List<Column> columns, BrokerDesc brokerDesc) throws 
UserException {
+            // Generate tuple descriptor
+            DescriptorTable descTable = new DescriptorTable();
+            TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+            // use index schema to fill the descriptor table
+            for (Column column : columns) {
+                SlotDescriptor destSlotDesc = 
descTable.addSlotDescriptor(destTupleDesc);
+                destSlotDesc.setIsMaterialized(true);
+                destSlotDesc.setColumn(column);
+                if (column.isAllowNull()) {
+                    destSlotDesc.setIsNullable(true);
+                } else {
+                    destSlotDesc.setIsNullable(false);
+                }
+            }
+            initTBrokerScanRange(descTable, destTupleDesc, columns, 
brokerDesc);
+            initTDescriptorTable(descTable);
+
+        }
+
+        private void initTBrokerScanRange(DescriptorTable descTable, 
TupleDescriptor destTupleDesc,
+                                          List<Column> columns, BrokerDesc 
brokerDesc) throws AnalysisException {
+            // scan range params
+            TBrokerScanRangeParams params = new TBrokerScanRangeParams();
+            params.setStrict_mode(false);
+            params.setProperties(brokerDesc.getProperties());
+            TupleDescriptor srcTupleDesc = descTable.createTupleDescriptor();
+            Map<String, SlotDescriptor> srcSlotDescByName = Maps.newHashMap();
+            for (Column column : columns) {
+                SlotDescriptor srcSlotDesc = 
descTable.addSlotDescriptor(srcTupleDesc);
+                
srcSlotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                srcSlotDesc.setIsMaterialized(true);
+                srcSlotDesc.setIsNullable(true);
+                srcSlotDesc.setColumn(new Column(column.getName(), 
PrimitiveType.VARCHAR));
+                params.addToSrc_slot_ids(srcSlotDesc.getId().asInt());
+                srcSlotDescByName.put(column.getName(), srcSlotDesc);
+            }
+
+            Map<Integer, Integer> destSidToSrcSidWithoutTrans = 
Maps.newHashMap();
+            for (SlotDescriptor destSlotDesc : destTupleDesc.getSlots()) {
+                if (!destSlotDesc.isMaterialized()) {
+                    continue;
+                }
+
+                SlotDescriptor srcSlotDesc = 
srcSlotDescByName.get(destSlotDesc.getColumn().getName());
+                destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), 
srcSlotDesc.getId().asInt());
+                Expr expr = new SlotRef(srcSlotDesc);
+                expr = castToSlot(destSlotDesc, expr);
+                params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), 
expr.treeToThrift());
+            }
+            
params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
+            params.setSrc_tuple_id(srcTupleDesc.getId().asInt());
+            params.setDest_tuple_id(destTupleDesc.getId().asInt());
+            tBrokerScanRange.setParams(params);
+
+            // broker address updated for each replica
+            tBrokerScanRange.setBroker_addresses(Lists.newArrayList());
+
+            // broker range desc
+            TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
+            tBrokerRangeDesc.setFile_type(TFileType.FILE_BROKER);
+            tBrokerRangeDesc.setFormat_type(TFileFormatType.FORMAT_PARQUET);
+            tBrokerRangeDesc.setSplittable(false);
+            tBrokerRangeDesc.setStart_offset(0);
+            tBrokerRangeDesc.setSize(-1);
+            // path and file size updated for each replica
+            tBrokerScanRange.setRanges(Lists.newArrayList(tBrokerRangeDesc));
+        }
+
+        private Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws 
AnalysisException {
+            PrimitiveType dstType = slotDesc.getType().getPrimitiveType();
+            PrimitiveType srcType = expr.getType().getPrimitiveType();
+            if (dstType == PrimitiveType.BOOLEAN && srcType == 
PrimitiveType.VARCHAR) {
+                // there is no cast VARCHAR to BOOLEAN function
+                // so we cast VARCHAR to TINYINT first, then cast TINYINT to 
BOOLEAN
+                return new CastExpr(Type.BOOLEAN, new CastExpr(Type.TINYINT, 
expr));
+            }
+            if (dstType != srcType) {
+                return expr.castTo(slotDesc.getType());
+            }
+            return expr;
+        }
+
+        private void initTDescriptorTable(DescriptorTable descTable) {
+            descTable.computeMemLayout();
+            tDescriptorTable = descTable.toThrift();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index 9d403cd..46a90ed 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -37,6 +37,7 @@ import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.load.AsyncDeleteJob;
 import org.apache.doris.load.LoadJob;
+import org.apache.doris.load.loadv2.SparkLoadJob;
 import org.apache.doris.persist.ReplicaPersistInfo;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.AgentTask;
@@ -385,6 +386,22 @@ public class MasterImpl {
                         pushTask.countDownLatch(backendId, pushTabletId);
                     }
                 }
+            } else if (pushTask.getPushType() == TPushType.LOAD_V2) {
+                long loadJobId = pushTask.getLoadJobId();
+                org.apache.doris.load.loadv2.LoadJob job = 
Catalog.getCurrentCatalog().getLoadManager().getLoadJob(loadJobId);
+                if (job == null) {
+                    throw new MetaNotFoundException("cannot find load job, 
job[" + loadJobId + "]");
+                }
+                for (int i = 0; i < tabletMetaList.size(); i++) {
+                    TabletMeta tabletMeta = tabletMetaList.get(i);
+                    checkReplica(finishTabletInfos.get(i), tabletMeta);
+                    long tabletId = tabletIds.get(i);
+                    Replica replica = findRelatedReplica(olapTable, partition, 
backendId, tabletId, tabletMeta.getIndexId());
+                    // if the replica is under schema change, could not find 
the replica with aim schema hash
+                    if (replica != null) {
+                        ((SparkLoadJob) 
job).addFinishedReplica(replica.getId(), pushTabletId, backendId);
+                    }
+                }
             }
             
             AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, 
signature);
diff --git a/fe/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/src/main/java/org/apache/doris/task/PushTask.java
index 10998fc..1d35130 100644
--- a/fe/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/src/main/java/org/apache/doris/task/PushTask.java
@@ -24,7 +24,9 @@ import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.Predicate;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TCondition;
+import org.apache.doris.thrift.TDescriptorTable;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushReq;
 import org.apache.doris.thrift.TPushType;
@@ -62,6 +64,10 @@ public class PushTask extends AgentTask {
     
     private long transactionId;
     private boolean isSchemaChanging;
+
+    // for load v2 (spark load)
+    private TBrokerScanRange tBrokerScanRange;
+    private TDescriptorTable tDescriptorTable;
     
     public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, 
long tableId, long partitionId,
                     long indexId, long tabletId, long replicaId, int 
schemaHash, long version, long versionHash, 
@@ -85,6 +91,8 @@ public class PushTask extends AgentTask {
         this.isSyncDelete = true;
         this.asyncDeleteJobId = -1;
         this.transactionId = transactionId;
+        this.tBrokerScanRange = null;
+        this.tDescriptorTable = null;
     }
 
     public PushTask(TResourceInfo resourceInfo, long backendId, long dbId, 
long tableId, long partitionId,
@@ -97,6 +105,19 @@ public class PushTask extends AgentTask {
              priority, TTaskType.PUSH, -1, tableId);
     }
 
+    // for load v2 (SparkLoadJob)
+    public PushTask(long backendId, long dbId, long tableId, long partitionId, 
long indexId, long tabletId,
+                    long replicaId, int schemaHash, int timeoutSecond, long 
loadJobId, TPushType pushType,
+                    TPriority priority, long transactionId, long signature,
+                    TBrokerScanRange tBrokerScanRange, TDescriptorTable 
tDescriptorTable) {
+        this(null, backendId, dbId, tableId, partitionId, indexId,
+             tabletId, replicaId, schemaHash, -1, 0, null,
+             0, timeoutSecond, loadJobId, pushType, null, false,
+             priority, TTaskType.REALTIME_PUSH, transactionId, signature);
+        this.tBrokerScanRange = tBrokerScanRange;
+        this.tDescriptorTable = tDescriptorTable;
+    }
+
     public TPushReq toThrift() {
         TPushReq request = new TPushReq(tabletId, schemaHash, version, 
versionHash, timeoutSecond, pushType);
         if (taskType == TTaskType.REALTIME_PUSH) {
@@ -145,6 +166,10 @@ public class PushTask extends AgentTask {
                 }
                 request.setDelete_conditions(tConditions);
                 break;
+            case LOAD_V2:
+                request.setBroker_scan_range(tBrokerScanRange);
+                request.setDesc_tbl(tDescriptorTable);
+                break;
             default:
                 LOG.warn("unknown push type. type: " + pushType.name());
                 break;
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java 
b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
index a59487f..c167c1a 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java
@@ -66,6 +66,9 @@ public abstract class TxnCommitAttachment implements Writable 
{
             attachment = new LoadJobFinalOperation();
         } else if (type == LoadJobSourceType.BACKEND_STREAMING) {
             attachment = new MiniLoadTxnCommitAttachment();
+        } else if (type == LoadJobSourceType.FRONTEND) {
+            // spark load
+            attachment = new LoadJobFinalOperation();
         } else {
             throw new IOException("Unknown load job source type: " + 
type.name());
         }
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
new file mode 100644
index 0000000..d288610
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.BrokerMgr;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.GenericPool;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.load.EtlStatus;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerListPathRequest;
+import org.apache.doris.thrift.TBrokerListResponse;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TEtlState;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.Mocked;
+import mockit.MockUp;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkAppHandle.State;
+import org.apache.spark.launcher.SparkLauncher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SparkEtlJobHandlerTest {
+    private long loadJobId;
+    private String label;
+    private String resourceName;
+    private String broker;
+    private long pendingTaskId;
+    private String appId;
+    private String etlOutputPath;
+    private String trackingUrl;
+
+    @Before
+    public void setUp() {
+        loadJobId = 0L;
+        label = "label0";
+        resourceName = "spark0";
+        broker = "broker0";
+        pendingTaskId = 3L;
+        appId = "application_15888888888_0088";
+        etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101";
+        trackingUrl = 
"http://127.0.0.1:8080/proxy/application_1586619723848_0088/";;
+    }
+
+    @Test
+    public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked 
SparkLauncher launcher,
+                                 @Injectable SparkAppHandle handle) throws 
IOException, LoadException {
+        new Expectations() {
+            {
+                launcher.startApplication((SparkAppHandle.Listener) any);
+                result = handle;
+                handle.getAppId();
+                returns(null, null, appId);
+                handle.getState();
+                returns(State.CONNECTED, State.SUBMITTED, State.RUNNING);
+            }
+        };
+
+        EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), 
etlOutputPath, label, null);
+        SparkResource resource = new SparkResource(resourceName);
+        Map<String, String> sparkConfigs = resource.getSparkConfigs();
+        sparkConfigs.put("spark.master", "yarn");
+        sparkConfigs.put("spark.submit.deployMode", "cluster");
+        sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", 
"127.0.0.1:9999");
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        SparkPendingTaskAttachment attachment = new 
SparkPendingTaskAttachment(pendingTaskId);
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, attachment);
+
+        // check submit etl job success
+        Assert.assertEquals(appId, attachment.getAppId());
+        Assert.assertEquals(handle, attachment.getHandle());
+    }
+
+    @Test(expected = LoadException.class)
+    public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked 
SparkLauncher launcher,
+                                       @Injectable SparkAppHandle handle) 
throws IOException, LoadException {
+        new Expectations() {
+            {
+                launcher.startApplication((SparkAppHandle.Listener) any);
+                result = handle;
+                handle.getAppId();
+                result = null;
+                handle.getState();
+                returns(State.CONNECTED, State.SUBMITTED, State.FAILED);
+            }
+        };
+
+        EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), 
etlOutputPath, label, null);
+        SparkResource resource = new SparkResource(resourceName);
+        Map<String, String> sparkConfigs = resource.getSparkConfigs();
+        sparkConfigs.put("spark.master", "yarn");
+        sparkConfigs.put("spark.submit.deployMode", "cluster");
+        sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", 
"127.0.0.1:9999");
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        SparkPendingTaskAttachment attachment = new 
SparkPendingTaskAttachment(pendingTaskId);
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, 
brokerDesc, attachment);
+    }
+
+    @Test
+    public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked 
YarnClient client,
+                                    @Injectable ApplicationReport report)
+            throws IOException, YarnException, UserException {
+        new Expectations() {
+            {
+                YarnClient.createYarnClient();
+                result = client;
+                client.getApplicationReport((ApplicationId) any);
+                result = report;
+                report.getYarnApplicationState();
+                returns(YarnApplicationState.RUNNING, 
YarnApplicationState.FINISHED, YarnApplicationState.FINISHED);
+                report.getFinalApplicationStatus();
+                returns(FinalApplicationStatus.UNDEFINED, 
FinalApplicationStatus.FAILED, FinalApplicationStatus.SUCCEEDED);
+                report.getTrackingUrl();
+                result = trackingUrl;
+                report.getProgress();
+                returns(0.5f, 1f, 1f);
+                // TODO(wyb): spark-load
+                //BrokerUtil.readFile(anyString, (BrokerDesc) any);
+                //result = "{'normal_rows': 10, 'abnormal_rows': 0, 
'failed_reason': 'etl job failed'}";
+            }
+        };
+
+        SparkResource resource = new SparkResource(resourceName);
+        Map<String, String> sparkConfigs = resource.getSparkConfigs();
+        sparkConfigs.put("spark.master", "yarn");
+        sparkConfigs.put("spark.submit.deployMode", "cluster");
+        sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", 
"127.0.0.1:9999");
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+
+        // running
+        EtlStatus status = handler.getEtlJobStatus(null, appId, loadJobId, 
etlOutputPath, resource, brokerDesc);
+        Assert.assertEquals(TEtlState.RUNNING, status.getState());
+        Assert.assertEquals(50, status.getProgress());
+
+        // yarn finished and spark failed
+        status = handler.getEtlJobStatus(null, appId, loadJobId, 
etlOutputPath, resource, brokerDesc);
+        Assert.assertEquals(TEtlState.CANCELLED, status.getState());
+        Assert.assertEquals(100, status.getProgress());
+        // TODO(wyb): spark-load
+        //Assert.assertEquals("etl job failed", 
status.getDppResult().failedReason);
+
+        // finished
+        status = handler.getEtlJobStatus(null, appId, loadJobId, 
etlOutputPath, resource, brokerDesc);
+        Assert.assertEquals(TEtlState.FINISHED, status.getState());
+        Assert.assertEquals(100, status.getProgress());
+        Assert.assertEquals(trackingUrl, status.getTrackingUrl());
+        // TODO(wyb): spark-load
+        //Assert.assertEquals(10, status.getDppResult().normalRows);
+        //Assert.assertEquals(0, status.getDppResult().abnormalRows);
+    }
+
+    @Test
+    public void testKillEtlJob(@Mocked YarnClient client) throws IOException, 
YarnException {
+        new Expectations() {
+            {
+                YarnClient.createYarnClient();
+                result = client;
+                client.killApplication((ApplicationId) any);
+                times = 1;
+            }
+        };
+
+        SparkResource resource = new SparkResource(resourceName);
+        Map<String, String> sparkConfigs = resource.getSparkConfigs();
+        sparkConfigs.put("spark.master", "yarn");
+        sparkConfigs.put("spark.submit.deployMode", "cluster");
+        sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", 
"127.0.0.1:9999");
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        try {
+            handler.killEtlJob(null, appId, loadJobId, resource);
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testGetEtlFilePaths(@Mocked TPaloBrokerService.Client client, 
@Mocked Catalog catalog,
+                                    @Injectable BrokerMgr brokerMgr) throws 
Exception {
+        // list response
+        TBrokerListResponse response = new TBrokerListResponse();
+        TBrokerOperationStatus status = new TBrokerOperationStatus();
+        status.statusCode = TBrokerOperationStatusCode.OK;
+        response.opStatus = status;
+        List<TBrokerFileStatus> files = Lists.newArrayList();
+        String filePath = 
"hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/label6.10.11.12.0.666666.parquet";
+        files.add(new TBrokerFileStatus(filePath, false, 10, false));
+        response.files = files;
+
+        FsBroker fsBroker = new FsBroker("127.0.0.1", 99999);
+
+        new MockUp<GenericPool<TPaloBrokerService.Client>>() {
+            @Mock
+            public TPaloBrokerService.Client borrowObject(TNetworkAddress 
address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, 
TPaloBrokerService.Client object) {
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, 
TPaloBrokerService.Client object) {
+            }
+        };
+
+        new Expectations() {
+            {
+                client.listPath((TBrokerListPathRequest) any);
+                result = response;
+                catalog.getBrokerMgr();
+                result = brokerMgr;
+                brokerMgr.getBroker(anyString, anyString);
+                result = fsBroker;
+            }
+        };
+
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        Map<String, Long> filePathToSize = 
handler.getEtlFilePaths(etlOutputPath, brokerDesc);
+        Assert.assertTrue(filePathToSize.containsKey(filePath));
+        Assert.assertEquals(10, (long) filePathToSize.get(filePath));
+    }
+
+    @Test
+    public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws 
UserException {
+        new Expectations() {
+            {
+                BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
+                times = 1;
+            }
+        };
+
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        try {
+            handler.deleteEtlOutputPath(etlOutputPath, brokerDesc);
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
new file mode 100644
index 0000000..671de5a
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -0,0 +1,490 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.DataDescription;
+import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.ResourceMgr;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DataQualityException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.EtlStatus;
+import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
+import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.MasterTaskExecutor;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TEtlState;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SparkLoadJobTest {
+    private long dbId;
+    private String dbName;
+    private String tableName;
+    private String label;
+    private String resourceName;
+    private String broker;
+    private long transactionId;
+    private long pendingTaskId;
+    private String appId;
+    private String etlOutputPath;
+    private long tableId;
+    private long partitionId;
+    private long indexId;
+    private long tabletId;
+    private long replicaId;
+    private long backendId;
+    private int schemaHash;
+
+    @Before
+    public void setUp() {
+        dbId = 1L;
+        dbName = "database0";
+        tableName = "table0";
+        label = "label0";
+        resourceName = "spark0";
+        broker = "broker0";
+        transactionId = 2L;
+        pendingTaskId = 3L;
+        appId = "application_15888888888_0088";
+        etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101";
+        tableId = 10L;
+        partitionId = 11L;
+        indexId = 12L;
+        tabletId = 13L;
+        replicaId = 14L;
+        backendId = 15L;
+        schemaHash = 146886;
+    }
+
+    @Test
+    public void testCreateFromLoadStmt(@Mocked Catalog catalog, @Injectable 
LoadStmt loadStmt,
+                                       @Injectable DataDescription 
dataDescription, @Injectable LabelName labelName,
+                                       @Injectable Database db, @Injectable 
OlapTable olapTable,
+                                       @Injectable ResourceMgr resourceMgr) {
+        List<DataDescription> dataDescriptionList = Lists.newArrayList();
+        dataDescriptionList.add(dataDescription);
+        Map<String, String> resourceProperties = Maps.newHashMap();
+        resourceProperties.put("spark.executor.memory", "1g");
+        resourceProperties.put("broker", broker);
+        resourceProperties.put("broker.username", "user0");
+        resourceProperties.put("broker.password", "password0");
+        ResourceDesc resourceDesc = new ResourceDesc(resourceName, 
resourceProperties);
+        Map<String, String> jobProperties = Maps.newHashMap();
+        SparkResource resource = new SparkResource(resourceName);
+
+        new Expectations() {
+            {
+                catalog.getDb(dbName);
+                result = db;
+                catalog.getResourceMgr();
+                result = resourceMgr;
+                db.getTable(tableName);
+                result = olapTable;
+                db.getId();
+                result = dbId;
+                loadStmt.getLabel();
+                result = labelName;
+                loadStmt.getDataDescriptions();
+                result = dataDescriptionList;
+                loadStmt.getResourceDesc();
+                result = resourceDesc;
+                loadStmt.getProperties();
+                result = jobProperties;
+                loadStmt.getEtlJobType();
+                result = EtlJobType.SPARK;
+                labelName.getDbName();
+                result = dbName;
+                labelName.getLabelName();
+                result = label;
+                dataDescription.getTableName();
+                result = tableName;
+                dataDescription.getPartitionNames();
+                result = null;
+                resourceMgr.getResource(resourceName);
+                result = resource;
+            }
+        };
+
+        try {
+            Assert.assertTrue(resource.getSparkConfigs().isEmpty());
+            resourceDesc.analyze();
+            BulkLoadJob bulkLoadJob = BulkLoadJob.fromLoadStmt(loadStmt);
+            SparkLoadJob sparkLoadJob = (SparkLoadJob) bulkLoadJob;
+            // check member
+            Assert.assertEquals(dbId, bulkLoadJob.dbId);
+            Assert.assertEquals(label, bulkLoadJob.label);
+            Assert.assertEquals(JobState.PENDING, bulkLoadJob.getState());
+            Assert.assertEquals(EtlJobType.SPARK, bulkLoadJob.getJobType());
+            Assert.assertEquals(resourceName, sparkLoadJob.getResourceName());
+            Assert.assertEquals(-1L, sparkLoadJob.getEtlStartTimestamp());
+
+            // check update spark resource properties
+            Assert.assertEquals(broker, bulkLoadJob.brokerDesc.getName());
+            Assert.assertEquals("user0", 
bulkLoadJob.brokerDesc.getProperties().get("username"));
+            Assert.assertEquals("password0", 
bulkLoadJob.brokerDesc.getProperties().get("password"));
+            SparkResource sparkResource = 
Deencapsulation.getField(sparkLoadJob, "sparkResource");
+            
Assert.assertTrue(sparkResource.getSparkConfigs().containsKey("spark.executor.memory"));
+            Assert.assertEquals("1g", 
sparkResource.getSparkConfigs().get("spark.executor.memory"));
+        } catch (DdlException | AnalysisException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testExecute(@Mocked Catalog catalog, @Mocked 
SparkLoadPendingTask pendingTask,
+                            @Injectable String originStmt, @Injectable 
GlobalTransactionMgr transactionMgr,
+                            @Injectable MasterTaskExecutor executor) throws 
Exception {
+        new Expectations() {
+            {
+                Catalog.getCurrentGlobalTransactionMgr();
+                result = transactionMgr;
+                transactionMgr.beginTransaction(dbId, Lists.newArrayList(), 
label, null,
+                                                
(TransactionState.TxnCoordinator) any, LoadJobSourceType.FRONTEND,
+                                                anyLong, anyLong);
+                result = transactionId;
+                pendingTask.init();
+                pendingTask.getSignature();
+                result = pendingTaskId;
+                catalog.getLoadTaskScheduler();
+                result = executor;
+                executor.submit((SparkLoadPendingTask) any);
+                result = true;
+            }
+        };
+
+        ResourceDesc resourceDesc = new ResourceDesc(resourceName, 
Maps.newHashMap());
+        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new 
OriginStatement(originStmt, 0));
+        job.execute();
+
+        // check transaction id and id to tasks
+        Assert.assertEquals(transactionId, job.getTransactionId());
+        Assert.assertTrue(job.idToTasks.containsKey(pendingTaskId));
+    }
+
+    @Test
+    public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable 
String originStmt) throws MetaNotFoundException {
+        ResourceDesc resourceDesc = new ResourceDesc(resourceName, 
Maps.newHashMap());
+        SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new 
OriginStatement(originStmt, 0));
+        SparkPendingTaskAttachment attachment = new 
SparkPendingTaskAttachment(pendingTaskId);
+        attachment.setAppId(appId);
+        attachment.setOutputPath(etlOutputPath);
+        job.onTaskFinished(attachment);
+
+        // check pending task finish
+        Assert.assertTrue(job.finishedTaskIds.contains(pendingTaskId));
+        Assert.assertEquals(appId, Deencapsulation.getField(job, "appId"));
+        Assert.assertEquals(etlOutputPath, Deencapsulation.getField(job, 
"etlOutputPath"));
+        Assert.assertEquals(JobState.ETL, job.getState());
+    }
+
+    private SparkLoadJob getEtlStateJob(String originStmt) throws 
MetaNotFoundException {
+        SparkResource resource = new SparkResource(resourceName);
+        Map<String, String> sparkConfigs = resource.getSparkConfigs();
+        sparkConfigs.put("spark.master", "yarn");
+        sparkConfigs.put("spark.submit.deployMode", "cluster");
+        sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", 
"127.0.0.1:9999");
+        SparkLoadJob job = new SparkLoadJob(dbId, label, null, new 
OriginStatement(originStmt, 0));
+        job.state = JobState.ETL;
+        job.maxFilterRatio = 0.15;
+        job.transactionId = transactionId;
+        Deencapsulation.setField(job, "appId", appId);
+        Deencapsulation.setField(job, "etlOutputPath", etlOutputPath);
+        Deencapsulation.setField(job, "sparkResource", resource);
+        BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap());
+        job.brokerDesc = brokerDesc;
+        return job;
+    }
+
+    @Test
+    public void testUpdateEtlStatusRunning(@Mocked Catalog catalog, 
@Injectable String originStmt,
+                                           @Mocked SparkEtlJobHandler handler) 
throws Exception {
+        String trackingUrl = 
"http://127.0.0.1:8080/proxy/application_1586619723848_0088/";;
+        int progress = 66;
+        EtlStatus status = new EtlStatus();
+        status.setState(TEtlState.RUNNING);
+        status.setTrackingUrl(trackingUrl);
+        status.setProgress(progress);
+
+        new Expectations() {
+            {
+                handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, 
etlOutputPath,
+                                        (SparkResource) any, (BrokerDesc) any);
+                result = status;
+            }
+        };
+
+        SparkLoadJob job = getEtlStateJob(originStmt);
+        job.updateEtlStatus();
+
+        // check update etl running
+        Assert.assertEquals(JobState.ETL, job.getState());
+        Assert.assertEquals(progress, job.progress);
+        Assert.assertEquals(trackingUrl, job.loadingStatus.getTrackingUrl());
+    }
+
+    @Test(expected = LoadException.class)
+    public void testUpdateEtlStatusCancelled(@Mocked Catalog catalog, 
@Injectable String originStmt,
+                                             @Mocked SparkEtlJobHandler 
handler) throws Exception {
+        EtlStatus status = new EtlStatus();
+        status.setState(TEtlState.CANCELLED);
+
+        new Expectations() {
+            {
+                handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, 
etlOutputPath,
+                                        (SparkResource) any, (BrokerDesc) any);
+                result = status;
+            }
+        };
+
+        SparkLoadJob job = getEtlStateJob(originStmt);
+        job.updateEtlStatus();
+    }
+
+    @Test(expected = DataQualityException.class)
+    public void testUpdateEtlStatusFinishedQualityFailed(@Mocked Catalog 
catalog, @Injectable String originStmt,
+                                                         @Mocked 
SparkEtlJobHandler handler) throws Exception {
+        EtlStatus status = new EtlStatus();
+        status.setState(TEtlState.FINISHED);
+        status.getCounters().put("dpp.norm.ALL", "8");
+        status.getCounters().put("dpp.abnorm.ALL", "2");
+
+        new Expectations() {
+            {
+                handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, 
etlOutputPath,
+                                        (SparkResource) any, (BrokerDesc) any);
+                result = status;
+            }
+        };
+
+        SparkLoadJob job = getEtlStateJob(originStmt);
+        job.updateEtlStatus();
+    }
+
+    @Test
+    public void testUpdateEtlStatusFinishedAndCommitTransaction(
+            @Mocked Catalog catalog, @Injectable String originStmt,
+            @Mocked SparkEtlJobHandler handler, @Mocked AgentTaskExecutor 
executor,
+            @Injectable Database db, @Injectable OlapTable table, @Injectable 
Partition partition,
+            @Injectable MaterializedIndex index, @Injectable Tablet tablet, 
@Injectable Replica replica,
+            @Injectable GlobalTransactionMgr transactionMgr) throws Exception {
+        EtlStatus status = new EtlStatus();
+        status.setState(TEtlState.FINISHED);
+        status.getCounters().put("dpp.norm.ALL", "9");
+        status.getCounters().put("dpp.abnorm.ALL", "1");
+        Map<String, Long> filePathToSize = Maps.newHashMap();
+        String filePath = 
String.format("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/V1.label6.%d.%d.%d.0.%d.parquet",
+                                        tableId, partitionId, indexId, 
schemaHash);
+        long fileSize = 6L;
+        filePathToSize.put(filePath, fileSize);
+        PartitionInfo partitionInfo = new RangePartitionInfo();
+        partitionInfo.addPartition(partitionId, null, (short) 1, false);
+
+        new Expectations() {
+            {
+                handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, 
etlOutputPath,
+                                        (SparkResource) any, (BrokerDesc) any);
+                result = status;
+                handler.getEtlFilePaths(etlOutputPath, (BrokerDesc) any);
+                result = filePathToSize;
+                catalog.getDb(dbId);
+                result = db;
+                db.getTable(tableId);
+                result = table;
+                table.getPartition(partitionId);
+                result = partition;
+                table.getPartitionInfo();
+                result = partitionInfo;
+                table.getSchemaByIndexId(Long.valueOf(12));
+                result = Lists.newArrayList(new Column("k1", 
PrimitiveType.VARCHAR));
+                
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+                result = Lists.newArrayList(index);
+                index.getId();
+                result = indexId;
+                index.getTablets();
+                result = Lists.newArrayList(tablet);
+                tablet.getId();
+                result = tabletId;
+                tablet.getReplicas();
+                result = Lists.newArrayList(replica);
+                replica.getId();
+                result = replicaId;
+                replica.getBackendId();
+                result = backendId;
+                replica.getLastFailedVersion();
+                result = -1;
+                AgentTaskExecutor.submit((AgentBatchTask) any);
+                Catalog.getCurrentGlobalTransactionMgr();
+                result = transactionMgr;
+                transactionMgr.commitTransaction(dbId, transactionId, 
(List<TabletCommitInfo>) any,
+                                                 (LoadJobFinalOperation) any);
+            }
+        };
+
+        SparkLoadJob job = getEtlStateJob(originStmt);
+        job.updateEtlStatus();
+
+        // check update etl finished
+        Assert.assertEquals(JobState.LOADING, job.getState());
+        Assert.assertEquals(0, job.progress);
+        Map<String, Pair<String, Long>> tabletMetaToFileInfo = 
Deencapsulation.getField(job, "tabletMetaToFileInfo");
+        Assert.assertEquals(1, tabletMetaToFileInfo.size());
+        String tabletMetaStr = EtlJobConfig.getTabletMetaStr(filePath);
+        Assert.assertTrue(tabletMetaToFileInfo.containsKey(tabletMetaStr));
+        Pair<String, Long> fileInfo = tabletMetaToFileInfo.get(tabletMetaStr);
+        Assert.assertEquals(filePath, fileInfo.first);
+        Assert.assertEquals(fileSize, (long) fileInfo.second);
+        Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask
+                = Deencapsulation.getField(job, "tabletToSentReplicaPushTask");
+        Assert.assertTrue(tabletToSentReplicaPushTask.containsKey(tabletId));
+        
Assert.assertTrue(tabletToSentReplicaPushTask.get(tabletId).containsKey(replicaId));
+        Map<Long, Set<Long>> tableToLoadPartitions = 
Deencapsulation.getField(job, "tableToLoadPartitions");
+        Assert.assertTrue(tableToLoadPartitions.containsKey(tableId));
+        
Assert.assertTrue(tableToLoadPartitions.get(tableId).contains(partitionId));
+        Map<Long, Integer> indexToSchemaHash = Deencapsulation.getField(job, 
"indexToSchemaHash");
+        Assert.assertTrue(indexToSchemaHash.containsKey(indexId));
+        Assert.assertEquals(schemaHash, (long) indexToSchemaHash.get(indexId));
+
+        // finish push task
+        job.addFinishedReplica(replicaId, tabletId, backendId);
+        job.updateLoadingStatus();
+        Assert.assertEquals(99, job.progress);
+        Set<Long> fullTablets = Deencapsulation.getField(job, "fullTablets");
+        Assert.assertTrue(fullTablets.contains(tabletId));
+    }
+
+    @Test
+    public void testStateUpdateInfoPersist() throws IOException {
+        String fileName = "./testStateUpdateInfoPersistFile";
+        File file = new File(fileName);
+
+        // etl state
+        long id = 1L;
+        JobState state = JobState.ETL;
+        long etlStartTimestamp = 1592366666L;
+        long loadStartTimestamp = -1;
+        Map<String, Pair<String, Long>> tabletMetaToFileInfo = 
Maps.newHashMap();
+
+        if (file.exists()) {
+            file.delete();
+        }
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
+        SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo(
+                id, state, transactionId, etlStartTimestamp, appId, 
etlOutputPath,
+                loadStartTimestamp, tabletMetaToFileInfo);
+        info.write(out);
+        out.flush();
+        out.close();
+
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+        SparkLoadJobStateUpdateInfo replayedInfo = 
(SparkLoadJobStateUpdateInfo) LoadJobStateUpdateInfo.read(in);
+        Assert.assertEquals(id, replayedInfo.getJobId());
+        Assert.assertEquals(state, replayedInfo.getState());
+        Assert.assertEquals(transactionId, replayedInfo.getTransactionId());
+        Assert.assertEquals(loadStartTimestamp, 
replayedInfo.getLoadStartTimestamp());
+        Assert.assertEquals(etlStartTimestamp, 
replayedInfo.getEtlStartTimestamp());
+        Assert.assertEquals(appId, replayedInfo.getAppId());
+        Assert.assertEquals(etlOutputPath, replayedInfo.getEtlOutputPath());
+        Assert.assertTrue(replayedInfo.getTabletMetaToFileInfo().isEmpty());
+        in.close();
+
+        // loading state
+        state = JobState.LOADING;
+        loadStartTimestamp = 1592388888L;
+        String tabletMeta = String.format("%d.%d.%d.0.%d", tableId, 
partitionId, indexId, schemaHash);
+        String filePath = 
String.format("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/V1.label6.%d.%d.%d.0.%d.parquet",
+                                        tableId, partitionId, indexId, 
schemaHash);
+        long fileSize = 6L;
+        tabletMetaToFileInfo.put(tabletMeta, Pair.create(filePath, fileSize));
+
+        if (file.exists()) {
+            file.delete();
+        }
+        file.createNewFile();
+        out = new DataOutputStream(new FileOutputStream(file));
+        info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, 
etlStartTimestamp, appId, etlOutputPath,
+                                               loadStartTimestamp, 
tabletMetaToFileInfo);
+        info.write(out);
+        out.flush();
+        out.close();
+
+        in = new DataInputStream(new FileInputStream(file));
+        replayedInfo = (SparkLoadJobStateUpdateInfo) 
LoadJobStateUpdateInfo.read(in);
+        Assert.assertEquals(state, replayedInfo.getState());
+        Assert.assertEquals(loadStartTimestamp, 
replayedInfo.getLoadStartTimestamp());
+        Map<String, Pair<String, Long>> replayedTabletMetaToFileInfo = 
replayedInfo.getTabletMetaToFileInfo();
+        Assert.assertEquals(1, replayedTabletMetaToFileInfo.size());
+        
Assert.assertTrue(replayedTabletMetaToFileInfo.containsKey(tabletMeta));
+        Pair<String, Long> replayedFileInfo = 
replayedTabletMetaToFileInfo.get(tabletMeta);
+        Assert.assertEquals(filePath, replayedFileInfo.first);
+        Assert.assertEquals(fileSize, (long) replayedFileInfo.second);
+        in.close();
+
+        // delete file
+        if (file.exists()) {
+            file.delete();
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to