This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b756f  [Refactor] Remove redundant code of mini load and insert 
(#4966)
77b756f is described below

commit 77b756fb87ce42986271d67080fb0cacadc8b903
Author: EmmyMiao87 <[email protected]>
AuthorDate: Wed Feb 3 22:19:20 2021 +0800

    [Refactor] Remove redundant code of mini load and insert (#4966)
    
    The content deleted by this PR includes mini load and insert in the old 
framework.
    The task and scheduling logic of old framework has been deleted.
---
 .../java/org/apache/doris/catalog/Catalog.java     |   7 -
 .../src/main/java/org/apache/doris/load/Load.java  |  43 ++--
 .../java/org/apache/doris/load/LoadChecker.java    |  12 --
 .../java/org/apache/doris/planner/CsvScanNode.java | 208 ------------------
 .../apache/doris/planner/DistributedPlanner.java   |  13 +-
 .../org/apache/doris/planner/OlapRewriteNode.java  | 126 -----------
 .../org/apache/doris/task/InsertLoadEtlTask.java   |  38 ----
 .../org/apache/doris/task/MiniLoadEtlTask.java     | 198 -----------------
 .../org/apache/doris/task/MiniLoadPendingTask.java | 216 -------------------
 .../org/apache/doris/task/PullLoadEtlTask.java     | 139 ------------
 .../java/org/apache/doris/task/PullLoadJob.java    | 110 ----------
 .../java/org/apache/doris/task/PullLoadJobMgr.java | 200 -----------------
 .../java/org/apache/doris/task/PullLoadTask.java   | 237 ---------------------
 .../org/apache/doris/task/PullLoadTaskPlanner.java | 160 --------------
 14 files changed, 20 insertions(+), 1687 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 8715704..e9b643f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -206,7 +206,6 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
-import org.apache.doris.task.PullLoadJobMgr;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -367,7 +366,6 @@ public class Catalog {
 
     private MetaReplayState metaReplayState;
 
-    private PullLoadJobMgr pullLoadJobMgr;
     private BrokerMgr brokerMgr;
     private ResourceMgr resourceMgr;
 
@@ -520,7 +518,6 @@ public class Catalog {
 
         this.isDefaultClusterCreated = false;
 
-        this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog);
         this.brokerMgr = new BrokerMgr();
         this.resourceMgr = new ResourceMgr();
 
@@ -589,10 +586,6 @@ public class Catalog {
         return SingletonHolder.INSTANCE;
     }
 
-    public PullLoadJobMgr getPullLoadJobMgr() {
-        return pullLoadJobMgr;
-    }
-
     public BrokerMgr getBrokerMgr() {
         return brokerMgr;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 9c4104f..315ba25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -595,7 +595,7 @@ public class Load {
      * This is only used for hadoop load
      */
     public static void checkAndCreateSource(Database db, DataDescription 
dataDescription,
-            Map<Long, Map<Long, List<Source>>> tableToPartitionSources, 
EtlJobType jobType) throws DdlException {
+                                            Map<Long, Map<Long, List<Source>>> 
tableToPartitionSources, EtlJobType jobType) throws DdlException {
         Source source = new Source(dataDescription.getFilePaths());
         long tableId = -1;
         Set<Long> sourcePartitionIds = Sets.newHashSet();
@@ -779,7 +779,7 @@ public class Load {
                     Pair<String, List<String>> function = entry.getValue();
                     try {
                         
DataDescription.validateMappingFunction(function.first, function.second, 
columnNameMap,
-                                                                mappingColumn, 
dataDescription.isHadoopLoad());
+                                mappingColumn, dataDescription.isHadoopLoad());
                     } catch (AnalysisException e) {
                         throw new DdlException(e.getMessage());
                     }
@@ -883,7 +883,7 @@ public class Load {
                      * (A, B, C) SET (__doris_shadow_B = B)
                      */
                     ImportColumnDesc importColumnDesc = new 
ImportColumnDesc(column.getName(),
-                                                                             
new SlotRef(null, originCol));
+                            new SlotRef(null, originCol));
                     shadowColumnDescs.add(importColumnDesc);
                 }
             } else {
@@ -918,7 +918,7 @@ public class Load {
                                    Map<String, SlotDescriptor> slotDescByName, 
TBrokerScanRangeParams params) throws UserException {
         rewriteColumns(columnExprs);
         initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, 
analyzer,
-                    srcTupleDesc, slotDescByName, params, true);
+                srcTupleDesc, slotDescByName, params, true);
     }
 
     /*
@@ -946,7 +946,7 @@ public class Load {
         }
         // check whether the OlapTable has sequenceCol
         boolean hasSequenceCol = false;
-        if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) {
+        if (tbl instanceof OlapTable && ((OlapTable) tbl).hasSequenceCol()) {
             hasSequenceCol = true;
         }
 
@@ -1771,12 +1771,7 @@ public class Load {
             clearJob(job, srcState);
         }
 
-        if (job.getBrokerDesc() != null) {
-            if (srcState == JobState.ETL) {
-                // Cancel job id
-                
Catalog.getCurrentCatalog().getPullLoadJobMgr().cancelJob(job.getId());
-            }
-        }
+        Preconditions.checkState(job.getBrokerDesc() == null);
         LOG.info("cancel load job success. job: {}", job);
         return true;
     }
@@ -1925,14 +1920,14 @@ public class Load {
                 if (tableNames.isEmpty()) {
                     // forward compatibility
                     if 
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), 
dbName,
-                                                                           
PrivPredicate.LOAD)) {
+                            PrivPredicate.LOAD)) {
                         continue;
                     }
                 } else {
                     boolean auth = true;
                     for (String tblName : tableNames) {
                         if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), 
dbName,
-                                                                               
 tblName, PrivPredicate.LOAD)) {
+                                tblName, PrivPredicate.LOAD)) {
                             auth = false;
                             break;
                         }
@@ -2004,8 +1999,8 @@ public class Load {
 
                 // task info
                 jobInfo.add("cluster:" + loadJob.getHadoopCluster()
-                                    + "; timeout(s):" + 
loadJob.getTimeoutSecond()
-                                    + "; max_filter_ratio:" + 
loadJob.getMaxFilterRatio());
+                        + "; timeout(s):" + loadJob.getTimeoutSecond()
+                        + "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
 
                 // error msg
                 if (loadJob.getState() == JobState.CANCELLED) {
@@ -2330,7 +2325,7 @@ public class Load {
                         continue;
                     }
                     replica.updateVersionInfo(info.getVersion(), 
info.getVersionHash(),
-                                              info.getDataSize(), 
info.getRowCount());
+                            info.getDataSize(), info.getRowCount());
                 }
             }
 
@@ -2349,7 +2344,7 @@ public class Load {
                             continue;
                         }
                         updatePartitionVersion(partition, 
partitionLoadInfo.getVersion(),
-                                               
partitionLoadInfo.getVersionHash(), jobId);
+                                partitionLoadInfo.getVersionHash(), jobId);
 
                         // update table row count
                         for (MaterializedIndex materializedIndex : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
@@ -2446,7 +2441,7 @@ public class Load {
                         continue;
                     }
                     replica.updateVersionInfo(info.getVersion(), 
info.getVersionHash(),
-                                              info.getDataSize(), 
info.getRowCount());
+                            info.getDataSize(), info.getRowCount());
                 }
             }
         } else {
@@ -2652,7 +2647,7 @@ public class Load {
                 // hdfs://host:port/outputPath/dbId/loadLabel/
                 DppConfig dppConfig = job.getHadoopDppConfig();
                 String outputPath = 
DppScheduler.getEtlOutputPath(dppConfig.getFsDefaultName(),
-                                                                  
dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), "");
+                        dppConfig.getOutputPath(), job.getDbId(), 
job.getLabel(), "");
                 try {
                     dppScheduler.deleteEtlOutputPath(outputPath);
                 } catch (Exception e) {
@@ -2696,7 +2691,7 @@ public class Load {
     }
 
     public boolean updateLoadJobState(LoadJob job, JobState destState, 
CancelType cancelType, String msg,
-            List<String> failedMsg) {
+                                      List<String> failedMsg) {
         boolean result = true;
         JobState srcState = null;
 
@@ -2855,7 +2850,7 @@ public class Load {
                 }
 
                 updatePartitionVersion(partition, 
partitionLoadInfo.getVersion(),
-                                       partitionLoadInfo.getVersionHash(), 
jobId);
+                        partitionLoadInfo.getVersionHash(), jobId);
 
                 for (MaterializedIndex materializedIndex : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                     long tableRowCount = 0L;
@@ -2889,7 +2884,7 @@ public class Load {
         long partitionId = partition.getId();
         partition.updateVisibleVersionAndVersionHash(version, versionHash);
         LOG.info("update partition version success. version: {}, version hash: 
{}, job id: {}, partition id: {}",
-                 version, versionHash, jobId, partitionId);
+                version, versionHash, jobId, partitionId);
     }
 
     private boolean processCancelled(LoadJob job, CancelType cancelType, 
String msg, List<String> failedMsg) {
@@ -2961,8 +2956,8 @@ public class Load {
         if (srcState == JobState.LOADING || srcState == 
JobState.QUORUM_FINISHED) {
             for (PushTask pushTask : job.getPushTasks()) {
                 AgentTaskQueue.removePushTask(pushTask.getBackendId(), 
pushTask.getSignature(),
-                                              pushTask.getVersion(), 
pushTask.getVersionHash(),
-                                              pushTask.getPushType(), 
pushTask.getTaskType());
+                        pushTask.getVersion(), pushTask.getVersionHash(),
+                        pushTask.getPushType(), pushTask.getTaskType());
             }
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index 2cbcd10..0b8d847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -39,11 +39,8 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.HadoopLoadEtlTask;
 import org.apache.doris.task.HadoopLoadPendingTask;
-import org.apache.doris.task.InsertLoadEtlTask;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.MasterTaskExecutor;
-import org.apache.doris.task.MiniLoadEtlTask;
-import org.apache.doris.task.MiniLoadPendingTask;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
@@ -172,9 +169,6 @@ public class LoadChecker extends MasterDaemon {
                     case HADOOP:
                         task = new HadoopLoadPendingTask(job);
                         break;
-                    case MINI:
-                        task = new MiniLoadPendingTask(job);
-                        break;
                     default:
                         LOG.warn("unknown etl job type. type: {}", 
etlJobType.name());
                         break;
@@ -200,12 +194,6 @@ public class LoadChecker extends MasterDaemon {
                     case HADOOP:
                         task = new HadoopLoadEtlTask(job);
                         break;
-                    case MINI:
-                        task = new MiniLoadEtlTask(job);
-                        break;
-                    case INSERT:
-                        task = new InsertLoadEtlTask(job);
-                        break;
                     default:
                         LOG.warn("unknown etl job type. type: {}", 
etlJobType.name());
                         break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java
deleted file mode 100644
index 04aab41..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java
+++ /dev/null
@@ -1,208 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.PartitionLoadInfo;
-import org.apache.doris.load.Source;
-import org.apache.doris.load.TableLoadInfo;
-import org.apache.doris.thrift.TColumnType;
-import org.apache.doris.thrift.TCsvScanNode;
-import org.apache.doris.thrift.TMiniLoadEtlFunction;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class CsvScanNode extends ScanNode {
-    private static final Logger LOG = LogManager.getLogger(CsvScanNode.class);
-
-    private final OlapTable table;
-    private final LoadJob job;
-
-    private List<String> filePaths = Lists.newArrayList();
-
-    private String columnSeparator;
-    private String lineDelimiter;
-
-    private List<String> columns = Lists.newArrayList();
-    private List<String> unspecifiedColumns = Lists.newArrayList();
-    private List<String> defaultValues = Lists.newArrayList();
-
-    private Map<String, TColumnType> columnTypeMapping = Maps.newHashMap();
-    private Map<String, TMiniLoadEtlFunction> columnToFunction = 
Maps.newHashMap();
-
-    private double maxFilterRatio = 0.0;
-
-    public CsvScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable table, 
LoadJob job) {
-        super(id, desc, "Scan CSV");
-        this.table = table;
-        this.job = job;
-    }
-
-    @Override
-    protected void toThrift(TPlanNode msg) {
-        msg.node_type = TPlanNodeType.CSV_SCAN_NODE;
-        msg.csv_scan_node = new TCsvScanNode(desc.getId().asInt(), filePaths);
-
-        if (!Strings.isNullOrEmpty(columnSeparator)) {
-            msg.csv_scan_node.setColumnSeparator(columnSeparator);
-        }
-        if (!Strings.isNullOrEmpty(lineDelimiter)) {
-            msg.csv_scan_node.setLineDelimiter(lineDelimiter);
-        }
-
-        if (!columns.isEmpty()) {
-            msg.csv_scan_node.setColumns(columns);
-        }
-        if (!unspecifiedColumns.isEmpty()) {
-            msg.csv_scan_node.setUnspecifiedColumns(unspecifiedColumns);
-        }
-        if (!defaultValues.isEmpty()) {
-            msg.csv_scan_node.setDefaultValues(defaultValues);
-        }
-
-        if (!columnToFunction.isEmpty()) {
-            msg.csv_scan_node.setColumnFunctionMapping(columnToFunction);
-        }
-        msg.csv_scan_node.setColumnTypeMapping(columnTypeMapping);
-        msg.csv_scan_node.setMaxFilterRatio(maxFilterRatio);
-        msg.csv_scan_node.setColumnSeparator(columnSeparator);
-    }
-
-    @Override
-    public void finalize(Analyzer analyzer) throws UserException {
-        // get file paths
-        // file paths in different partitions are same in mini load
-        TableLoadInfo tableLoadInfo = job.getTableLoadInfo(table.getId());
-        Collection<PartitionLoadInfo> partitionLoadInfos = 
tableLoadInfo.getIdToPartitionLoadInfo().values();
-        Preconditions.checkState(!partitionLoadInfos.isEmpty());
-        PartitionLoadInfo partitionLoadInfo = (PartitionLoadInfo) 
partitionLoadInfos.toArray()[0];
-        List<Source> sources = partitionLoadInfo.getSources();
-        Preconditions.checkState(sources.size() == 1);
-        Source source = sources.get(0);
-        filePaths = source.getFileUrls();
-
-        // column separator
-        columnSeparator = source.getColumnSeparator();
-
-        // line delimiter
-        lineDelimiter = source.getLineDelimiter();
-
-        // construct columns (specified & unspecified) and default-values
-        List<String> columnNames = Lists.newArrayList();
-        for (Column column : table.getBaseSchema()) {
-            columnNames.add(column.getName());
-        }
-        columns = source.getColumnNames();
-        if (columns.isEmpty()) {
-            columns = columnNames;
-        }
-        for (String columnName : columns) {
-            if (!columnNames.contains(columnName)) {
-                LOG.info("Column [{}] is not exist in table schema, will be 
ignored.", columnName);
-            }
-        }
-        for (String columnName : columnNames) {
-            Column column = table.getColumn(columnName);
-            columnTypeMapping.put(columnName, 
column.getOriginType().toColumnTypeThrift());
-
-            if (columns.contains(columnName)) {
-                continue;
-            }
-            unspecifiedColumns.add(columnName);
-            String defaultValue = column.getDefaultValue();
-            if (defaultValue == null && false == column.isAllowNull()) {
-                throw new UserException(
-                        "Column [" + columnName + "] should be specified. "
-                                + "only columns have default values can be 
omitted");
-            }
-            if (true == column.isAllowNull() && null == defaultValue) {
-                defaultValues.add("\\N");
-            } else {
-                defaultValues.add(defaultValue);
-            }
-        }
-
-        Map<String, Pair<String, List<String>>> functions = 
source.getColumnToFunction();
-        for (String key : functions.keySet()) {
-            final Pair<String, List<String>> pair = functions.get(key);
-            TMiniLoadEtlFunction function = new TMiniLoadEtlFunction();
-            int paramColumnIndex = -1; 
-            for (String str : pair.second) {
-                boolean find = false;
-                for (int i = 0; i < columns.size(); i++) {
-                    if (str.equals(columns.get(i))) {
-                        paramColumnIndex = i;
-                        find = true;
-                        break;
-                    }   
-                }   
-                if (find) {
-                    function.setFunctionName(pair.first);
-                    function.setParamColumnIndex(paramColumnIndex);
-                    columnToFunction.put(key, function); 
-                    break;
-                }   
-            }   
-        } 
-        // max filter ratio
-        // TODO: remove!!
-        maxFilterRatio = job.getMaxFilterRatio();
-    }
-
-    @Override
-    protected String debugString() {
-        ToStringHelper helper = MoreObjects.toStringHelper(this);
-        return helper.addValue(super.debugString()).toString();
-    }
-
-    /**
-     * like Mysql, We query Meta to get request's data location
-     * extra result info will pass to backend ScanNode
-     */
-    @Override
-    public List<TScanRangeLocations> getScanRangeLocations(long 
maxScanRangeLength) {
-        return null;
-    }
-
-    @Override
-    public int getNumInstances() {
-        return 1;
-    }
-}
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 16caf11..213cce2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -205,9 +205,7 @@ public class DistributedPlanner {
                     childFragments.get(0));
         } else if (root instanceof SelectNode) {
             result = createSelectNodeFragment((SelectNode) root, 
childFragments);
-        } else if (root instanceof OlapRewriteNode) {
-            result = createOlapRewriteNodeFragment((OlapRewriteNode) root, 
childFragments);
-        } else if (root instanceof SetOperationNode) {
+        }  else if (root instanceof SetOperationNode) {
             result = createSetOperationNodeFragment((SetOperationNode) root, 
childFragments, fragments);
         } else if (root instanceof MergeNode) {
             result = createMergeNodeFragment((MergeNode) root, childFragments, 
fragments);
@@ -838,15 +836,6 @@ public class DistributedPlanner {
         return childFragment;
     }
 
-    private PlanFragment createOlapRewriteNodeFragment(
-            OlapRewriteNode olapRewriteNode, ArrayList<PlanFragment> 
childFragments) {
-        Preconditions.checkState(olapRewriteNode.getChildren().size() == 
childFragments.size());
-        PlanFragment childFragment = childFragments.get(0);
-        olapRewriteNode.setChild(0, childFragment.getPlanRoot());
-        childFragment.setPlanRoot(olapRewriteNode);
-        return childFragment;
-    }
-
     /**
      * Replace node's child at index childIdx with an ExchangeNode that 
receives its input from childFragment.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java
deleted file mode 100644
index 95dc1b2..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java
+++ /dev/null
@@ -1,126 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import com.google.common.collect.Lists;
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.ExprSubstitutionMap;
-import org.apache.doris.analysis.InsertStmt;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TOlapRewriteNode;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-// Used to convert column to valid OLAP table
-public class OlapRewriteNode extends PlanNode {
-    private static final Logger LOG = 
LogManager.getLogger(OlapRewriteNode.class);
-
-    private InsertStmt insertStmt;
-
-    private Table table;
-    private TupleDescriptor tupleDescriptor;
-    private List<Expr> newResultExprs;
-
-    public OlapRewriteNode(PlanNodeId id, PlanNode child, InsertStmt 
insertStmt) {
-        super(id, insertStmt.getOlapTuple().getId().asList(), "OLAP REWRITE 
NODE");
-        addChild(child);
-
-        this.table = insertStmt.getTargetTable();
-        this.tupleDescriptor = insertStmt.getOlapTuple();
-        this.insertStmt = insertStmt;
-    }
-
-    public OlapRewriteNode(PlanNodeId id, PlanNode child,
-                           Table table,
-                           TupleDescriptor tupleDescriptor,
-                           List<Expr> slotRefs) {
-        super(id, child.getTupleIds(), "OLAP REWRITE NODE");
-        addChild(child);
-        this.table = table;
-        this.tupleDescriptor = tupleDescriptor;
-        this.newResultExprs = slotRefs;
-    }
-
-    @Override
-    public void init(Analyzer analyzer) throws UserException {
-        assignConjuncts(analyzer);
-      
-        // Set smap to the combined children's smaps and apply that to all 
conjuncts_.
-        createDefaultSmap(analyzer);
-
-        computeStats(analyzer);
-        // assignedConjuncts = analyzr.getAssignedConjuncts();
-
-        if (insertStmt != null) {
-            ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
-            newResultExprs = Lists.newArrayList();
-            for (Expr expr : insertStmt.getResultExprs()) {
-                newResultExprs.add(expr.clone(combinedChildSmap));
-            }
-        }
-    }
-
-    @Override
-    protected void toThrift(TPlanNode msg) {
-        msg.node_type = TPlanNodeType.OLAP_REWRITE_NODE;
-        TOlapRewriteNode tnode = new TOlapRewriteNode();
-        for (Column column : table.getBaseSchema()) {
-            
tnode.addToColumnTypes(column.getOriginType().toColumnTypeThrift());
-        }
-        for (Expr expr : newResultExprs) {
-            tnode.addToColumns(expr.treeToThrift());
-        }
-        tnode.setOutputTupleId(tupleDescriptor.getId().asInt());
-        msg.setOlapRewriteNode(tnode);
-    }
-
-    @Override
-    public void computeStats(Analyzer analyzer) {
-        super.computeStats(analyzer);
-        long cardinality = getChild(0).cardinality;
-        double selectivity = computeSelectivity();
-        if (cardinality < 0 || selectivity < 0) {
-            this.cardinality = -1;
-        } else {
-            this.cardinality = Math.round(cardinality * selectivity);
-        }
-    }
-
-    @Override
-    protected String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
-        StringBuilder output = new StringBuilder();
-        if (!conjuncts.isEmpty()) {
-            output.append(prefix + "predicates: " + 
getExplainString(conjuncts) + "\n");
-        }
-        return output.toString();
-    }
-
-    @Override
-    public int getNumInstances() {
-        return children.get(0).getNumInstances();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java
deleted file mode 100644
index 8a85a9c..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.common.LoadException;
-import org.apache.doris.load.LoadJob;
-
-public class InsertLoadEtlTask extends MiniLoadEtlTask {
-
-    public InsertLoadEtlTask(LoadJob job) {
-        super(job);
-    }
-
-    @Override
-    protected boolean updateJobEtlStatus() {
-        return true;
-    }
-
-    @Override
-    protected void processEtlRunning() throws LoadException {
-        throw new LoadException("not implement");
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java
deleted file mode 100644
index 9d17432..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java
+++ /dev/null
@@ -1,198 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.load.MiniEtlTaskInfo;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TEtlState;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
-
-import java.util.Map;
-
-public class MiniLoadEtlTask extends LoadEtlTask {
-    private static final Logger LOG = 
LogManager.getLogger(MiniLoadEtlTask.class);
-
-    public MiniLoadEtlTask(LoadJob job) {
-        super(job);
-    }
-
-    @Override
-    protected boolean updateJobEtlStatus() {
-        // update etl tasks status
-        if (job.miniNeedGetTaskStatus()) {
-            LOG.debug("get mini etl task status actively. job: {}", job);
-            for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
-                TEtlState etlState = taskInfo.getTaskStatus().getState();
-                if (etlState == TEtlState.RUNNING) {
-                    updateEtlTaskStatus(taskInfo);
-                }
-            }
-        }
-
-        // update etl job status
-        updateEtlJobStatus();
-        return true;
-    }
-     
-    private boolean updateEtlTaskStatus(MiniEtlTaskInfo taskInfo) {
-        // get etl status
-        TMiniLoadEtlStatusResult result = 
getMiniLoadEtlStatus(taskInfo.getBackendId(), taskInfo.getId());
-        LOG.info("mini load etl status: {}, job: {}", result, job);
-        if (result == null) {
-            return false;
-        }
-        TStatus tStatus = result.getStatus();
-        if (tStatus.getStatusCode() != TStatusCode.OK) {
-            LOG.warn("get buck load etl status fail. msg: {}, job: {}", 
tStatus.getErrorMsgs(), job);
-            return false;
-        }
-        
-        // update etl task status
-        EtlStatus taskStatus = taskInfo.getTaskStatus();
-        if (taskStatus.setState(result.getEtlState())) {
-            if (result.isSetCounters()) {
-                taskStatus.setCounters(result.getCounters());
-            }
-            if (result.isSetTrackingUrl()) {
-                taskStatus.setTrackingUrl(result.getTrackingUrl());
-            }
-            if (result.isSetFileMap()) {
-                taskStatus.setFileMap(result.getFileMap());
-            }       
-        }
-
-        return true;
-    }
-
-    private TMiniLoadEtlStatusResult getMiniLoadEtlStatus(long backendId, long 
taskId) {
-        TMiniLoadEtlStatusResult result = null; 
-        Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
-        if (backend == null || !backend.isAlive()) {
-            String failMsg = "backend is null or is not alive";
-            LOG.error(failMsg);
-            return result;
-        }
-
-        AgentClient client = new AgentClient(backend.getHost(), 
backend.getBePort());
-        return client.getEtlStatus(job.getId(), taskId);
-    }   
-
-    private boolean updateEtlJobStatus() {
-        boolean hasCancelledTask = false;
-        boolean hasRunningTask = false;
-        long normalNum = 0;
-        long abnormalNum = 0;
-        Map<String, Long> fileMap = Maps.newHashMap();
-        String trackingUrl = EtlStatus.DEFAULT_TRACKING_URL;
-
-        EtlStatus etlJobStatus = job.getEtlJobStatus();
-        for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
-            EtlStatus taskStatus = taskInfo.getTaskStatus();
-            switch (taskStatus.getState()) {
-                case RUNNING:
-                    hasRunningTask = true;
-                    break;
-                case CANCELLED:
-                    hasCancelledTask = true;
-                    break;
-                case FINISHED:
-                    // counters and file list
-                    Map<String, String> counters = taskStatus.getCounters();
-                    if (counters.containsKey(DPP_NORMAL_ALL)) {
-                        normalNum += 
Long.parseLong(counters.get(DPP_NORMAL_ALL));
-                    }
-                    if (counters.containsKey(DPP_ABNORMAL_ALL)) {
-                        abnormalNum += 
Long.parseLong(counters.get(DPP_ABNORMAL_ALL));
-                    }
-                    fileMap.putAll(taskStatus.getFileMap());
-                    if 
(!taskStatus.getTrackingUrl().equals(EtlStatus.DEFAULT_TRACKING_URL)) {
-                        trackingUrl = taskStatus.getTrackingUrl();
-                    }
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        if (hasCancelledTask) {
-            etlJobStatus.setState(TEtlState.CANCELLED);
-        } else if (hasRunningTask) {
-            etlJobStatus.setState(TEtlState.RUNNING);
-        } else {
-            etlJobStatus.setState(TEtlState.FINISHED);
-            Map<String, String> counters = Maps.newHashMap();
-            counters.put(DPP_NORMAL_ALL, String.valueOf(normalNum));
-            counters.put(DPP_ABNORMAL_ALL, String.valueOf(abnormalNum));
-            etlJobStatus.setCounters(counters);
-            etlJobStatus.setFileMap(fileMap);
-            etlJobStatus.setTrackingUrl(trackingUrl);
-        }
-        
-        return true;
-    }
-    
-    @Override
-    protected void processEtlRunning() throws LoadException {
-        // update mini etl job progress
-        int finishedTaskNum = 0;
-        Map<Long, MiniEtlTaskInfo> idToEtlTask = job.getMiniEtlTasks();
-        for (MiniEtlTaskInfo taskInfo : idToEtlTask.values()) {
-            EtlStatus taskStatus = taskInfo.getTaskStatus();
-            if (taskStatus.getState() == TEtlState.FINISHED) {
-                ++finishedTaskNum;
-            }
-        }
-
-        int progress = (int) (finishedTaskNum * 100 / idToEtlTask.size());
-        if (progress >= 100) {
-            // set progress to 100 when status is FINISHED
-            progress = 99;
-        }
-
-        job.setProgress(progress);
-    }
-   
-    @Override
-    protected Map<String, Pair<String, Long>> getFilePathMap() throws 
LoadException {
-        Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
-        if (fileMap == null) {
-            throw new LoadException("get etl files error");
-        }
-
-        Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
-        for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
-            String partitionIndexBucket = 
getPartitionIndexBucketString(entry.getKey());
-            // http://host:8000/data/dir/file
-            filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), 
entry.getValue()));
-        }
-
-        return filePathMap;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java
deleted file mode 100644
index f35226b..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java
+++ /dev/null
@@ -1,216 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
-import org.apache.doris.load.EtlSubmitResult;
-import org.apache.doris.load.LoadErrorHub;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.MiniEtlTaskInfo;
-import org.apache.doris.load.TableLoadInfo;
-import org.apache.doris.planner.CsvScanNode;
-import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSplitSink;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.PaloInternalServiceVersion;
-import org.apache.doris.thrift.TAgentResult;
-import org.apache.doris.thrift.TAgentServiceVersion;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TLoadErrorHubInfo;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
-import org.apache.doris.thrift.TPlanFragmentExecParams;
-import org.apache.doris.thrift.TQueryOptions;
-import org.apache.doris.thrift.TQueryType;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-
-public class MiniLoadPendingTask extends LoadPendingTask {
-    private static final Logger LOG = 
LogManager.getLogger(MiniLoadPendingTask.class);
-
-    // descriptor used to register all column and table need
-    private final DescriptorTable desc;
-
-    // destination Db and table get from request
-    // Data will load to this table
-    private OlapTable destTable;
-
-    // dest desc
-    private TupleDescriptor destTupleDesc;
-
-    private DataSplitSink tableSink;
-
-    private List<Pair<Long, TMiniLoadEtlTaskRequest>> requests;
-
-    public MiniLoadPendingTask(LoadJob job) {
-        super(job);
-        this.desc = new DescriptorTable();
-    }
-
-    @Override
-    protected void createEtlRequest() throws Exception {
-        requests = Lists.newArrayList();
-        for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
-            long taskId = taskInfo.getId();
-            long backendId = taskInfo.getBackendId();
-            long tableId = taskInfo.getTableId();
-
-            // All the following operation will process when destTable's read 
lock are hold.
-            destTable = (OlapTable) db.getTable(tableId);
-            if (destTable == null) {
-                throw new LoadException("table does not exist. id: " + 
tableId);
-            }
-
-            destTable.readLock();
-            try {
-                registerToDesc();
-                tableSink = new DataSplitSink(destTable, destTupleDesc);
-
-                // add schema hash to table load info
-                TableLoadInfo tableLoadInfo = 
job.getTableLoadInfo(destTable.getId());
-                for (Map.Entry<Long, Integer> entry : 
destTable.getIndexIdToSchemaHash().entrySet()) {
-                    tableLoadInfo.addIndexSchemaHash(entry.getKey(), 
entry.getValue());
-                }
-                requests.add(new Pair<Long, 
TMiniLoadEtlTaskRequest>(backendId, createRequest(taskId)));
-            } finally {
-                destTable.readUnlock();
-            }
-        }
-    }
-
-    @Override
-    protected EtlSubmitResult submitEtlJob(int retry) {
-        LOG.info("begin submit mini load etl job: {}", job);
-
-        for (Pair<Long, TMiniLoadEtlTaskRequest> pair : requests) {
-            long backendId = pair.first;
-            TMiniLoadEtlTaskRequest request = pair.second;
-
-            Backend backend = 
Catalog.getCurrentSystemInfo().getBackend(backendId);
-            if 
(!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
-                String failMsg = "backend is null or is not alive";
-                LOG.error(failMsg);
-                TStatus tStatus = new TStatus(TStatusCode.CANCELLED);
-                tStatus.setErrorMsgs(Lists.newArrayList(failMsg));
-                return new EtlSubmitResult(tStatus, null);
-            }
-
-            AgentClient client = new AgentClient(backend.getHost(), 
backend.getBePort());
-            TAgentResult submitResult = client.submitEtlTask(request);
-            if (submitResult.getStatus().getStatusCode() != TStatusCode.OK) {
-                return new EtlSubmitResult(submitResult.getStatus(), null);
-            }
-        }
-
-        return new EtlSubmitResult(new TStatus(TStatusCode.OK), null);
-    }
-
-    private void registerToDesc() {
-        destTupleDesc = desc.createTupleDescriptor();
-        destTupleDesc.setTable(destTable);
-        // Lock database and get its schema hash??
-        // Make sure that import job has its corresponding schema
-        for (Column col : destTable.getBaseSchema()) {
-            SlotDescriptor slot = desc.addSlotDescriptor(destTupleDesc);
-            // All this slot is needed
-            slot.setIsMaterialized(true);
-            slot.setColumn(col);
-            if (true == col.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
-        }
-    }
-
-    private TMiniLoadEtlTaskRequest createRequest(long taskId) throws 
LoadException {
-        ScanNode csvScanNode = new CsvScanNode(new PlanNodeId(0), 
destTupleDesc, destTable, job);
-        desc.computeMemLayout();
-        try {
-            csvScanNode.finalize(null);
-        } catch (UserException e) {
-            LOG.warn("csvScanNode finalize failed[err={}]", e);
-            throw new LoadException("CSV scan finalize failed.", e);
-        }
-        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), 
csvScanNode, DataPartition.UNPARTITIONED);
-        fragment.setSink(tableSink);
-
-        try {
-            fragment.finalize(null, false);
-        } catch (Exception e) {
-            LOG.info("fragment finalize failed.e = {}", e);
-            throw new LoadException("Fragment finalize failed.", e);
-        }
-
-        TMiniLoadEtlTaskRequest request = new TMiniLoadEtlTaskRequest();
-        request.setProtocolVersion(TAgentServiceVersion.V1);
-        TExecPlanFragmentParams params = new TExecPlanFragmentParams();
-        params.setProtocolVersion(PaloInternalServiceVersion.V1);
-        params.setFragment(fragment.toThrift());
-        params.setDescTbl(desc.toThrift());
-        params.setImportLabel(job.getLabel());
-        params.setDbName(db.getFullName());
-        params.setLoadJobId(job.getId());
-
-        LoadErrorHub.Param param = load.getLoadErrorHubInfo();
-        if (param != null) {
-            TLoadErrorHubInfo info = param.toThrift();
-            if (info != null) {
-                params.setLoadErrorHubInfo(info);
-            }
-        }
-
-        TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
-        // Only use fragment id
-        TUniqueId uniqueId = new TUniqueId(job.getId(), taskId);
-        execParams.setQueryId(new TUniqueId(uniqueId));
-        execParams.setFragmentInstanceId(uniqueId);
-        execParams.per_node_scan_ranges = Maps.newHashMap();
-        execParams.per_exch_num_senders = Maps.newHashMap();
-        execParams.destinations = Lists.newArrayList();
-        params.setParams(execParams);
-        TQueryOptions queryOptions = new TQueryOptions();
-        queryOptions.setQueryType(TQueryType.LOAD);
-        params.setQueryOptions(queryOptions);
-        request.setParams(params);
-        return request;
-    }
-
-}
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java
deleted file mode 100644
index 154134c..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.thrift.TEtlState;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Map;
-
-// Used to process pull load etl task
-@Deprecated
-public class PullLoadEtlTask extends LoadEtlTask {
-    private static final Logger LOG = 
LogManager.getLogger(PullLoadEtlTask.class);
-    private PullLoadJobMgr mgr;
-
-    public PullLoadEtlTask(LoadJob job) {
-        super(job);
-        mgr = Catalog.getCurrentCatalog().getPullLoadJobMgr();
-    }
-
-    @Override
-    protected String getErrorMsg() {
-        String errMsg = null;
-        PullLoadJob pullLoadJob = mgr.getJob(job.getId());
-        if (pullLoadJob != null) {
-            PullLoadTask failureTask = pullLoadJob.getFailureTask();
-            if (failureTask != null) {
-                if (failureTask.getExecuteStatus() != null) {
-                    errMsg = "Broker etl failed: " + 
failureTask.getExecuteStatus().getErrorMsg();
-                }
-            }
-        }
-        return errMsg != null ? errMsg : super.getErrorMsg();
-    }
-
-    @Override
-    protected boolean updateJobEtlStatus() {
-        PullLoadJob pullLoadJob = mgr.getJob(job.getId());
-        EtlStatus etlStatus = job.getEtlJobStatus();
-        if (pullLoadJob == null) {
-            LOG.warn("pullLoadJob is null. JobId is {}", job.getId());
-            return false;
-        }
-        switch (pullLoadJob.getState()) {
-            case CANCELED:
-            case FAILED:
-                etlStatus.setState(TEtlState.CANCELLED);
-                break;
-            case FINISHED:
-                updateFinishInfo(pullLoadJob);
-                etlStatus.setState(TEtlState.FINISHED);
-                break;
-            case RUNNING:
-                etlStatus.setState(TEtlState.RUNNING);
-                break;
-            default:
-                etlStatus.setState(TEtlState.UNKNOWN);
-                break;
-        }
-        return true;
-    }
-
-    private void updateFinishInfo(PullLoadJob pullLoadJob) {
-        Map<String, Long> fileMap = Maps.newHashMap();
-        long numRowsNormal = 0;
-        long numRowsAbnormal = 0;
-        String trackingUrl = null;
-        for (PullLoadTask task : pullLoadJob.tasks) {
-            fileMap.putAll(task.getFileMap());
-
-            String value = task.getCounters().get(DPP_NORMAL_ALL);
-            if (value != null) {
-                numRowsNormal += Long.valueOf(value);
-            }
-            value = task.getCounters().get(DPP_ABNORMAL_ALL);
-            if (value != null) {
-                numRowsAbnormal += Long.valueOf(value);
-            }
-            if (trackingUrl == null && task.getTrackingUrl() != null) {
-                trackingUrl = task.getTrackingUrl();
-            }
-        }
-        Map<String, String> counters = Maps.newHashMap();
-        counters.put(DPP_NORMAL_ALL, "" + numRowsNormal);
-        counters.put(DPP_ABNORMAL_ALL, "" + numRowsAbnormal);
-
-        EtlStatus etlJobStatus = job.getEtlJobStatus();
-        etlJobStatus.setFileMap(fileMap);
-        etlJobStatus.setCounters(counters);
-        if (trackingUrl != null) {
-            etlJobStatus.setTrackingUrl(trackingUrl);
-        }
-    }
-
-    @Override
-    protected void processEtlRunning() throws LoadException {
-    }
-
-    @Override
-    protected Map<String, Pair<String, Long>> getFilePathMap() throws 
LoadException {
-        Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
-        if (fileMap == null) {
-            throw new LoadException("get etl files error");
-        }
-
-        Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
-        for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
-            String partitionIndexBucket = 
getPartitionIndexBucketString(entry.getKey());
-            // http://host:8000/data/dir/file
-            filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), 
entry.getValue()));
-        }
-
-        return filePathMap;
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java
deleted file mode 100644
index fa2e520..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.load.LoadJob;
-
-import com.google.common.collect.Sets;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Set;
-
-// One pull load job
-@Deprecated
-public class PullLoadJob {
-    private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
-
-    public enum State {
-        RUNNING,
-        FINISHED,
-        CANCELED,
-        FAILED,
-        UNKNOWN;
-
-        public boolean isRunning() {
-            return this == RUNNING;
-        }
-    }
-
-    // Input params
-    public final LoadJob job;
-    public final List<PullLoadTask> tasks;
-
-    public Set<Integer> finishedTask;
-
-    // Used to
-    public volatile State state;
-    // Only used when this job has failed.
-    public PullLoadTask failureTask;
-
-    public PullLoadJob(LoadJob job, List<PullLoadTask> tasks) {
-        this.job = job;
-        this.tasks = tasks;
-        finishedTask = Sets.newHashSet();
-        state = State.RUNNING;
-    }
-
-    public long getId() {
-        return job.getId();
-    }
-
-    public synchronized State getState() {
-        return state;
-    }
-
-    public synchronized boolean isRunning() {
-        return state.isRunning();
-    }
-
-    public synchronized void cancel() {
-        state = State.CANCELED;
-        for (PullLoadTask task : tasks) {
-            task.cancel();
-        }
-    }
-
-    public PullLoadTask getFailureTask() {
-        return failureTask;
-    }
-
-    public synchronized void onTaskFinished(PullLoadTask task) {
-        int taskId = task.taskId;
-        if (!state.isRunning()) {
-            LOG.info("Ignore task info after this job has been stable. 
taskId={}:{}", task.jobId, taskId);
-            return;
-        }
-        if (!finishedTask.add(taskId)) {
-            LOG.info("Receive duplicate task information. taskId={}:{}", 
task.jobId, taskId);
-        }
-        if (finishedTask.size() == tasks.size()) {
-            state = State.FINISHED;
-        }
-    }
-
-    public synchronized void onTaskFailed(PullLoadTask task) {
-        if (!state.isRunning()) {
-            LOG.info("Ignore task info after this job has been stable. 
taskId={}:{}", task.jobId, task.taskId);
-            return;
-        }
-        state = State.FAILED;
-        failureTask = task;
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java
deleted file mode 100644
index 50a8068..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.common.Status;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.ReentrantLock;
-
-@Deprecated
-public class PullLoadJobMgr {
-    private static final Logger LOG = 
LogManager.getLogger(PullLoadJobMgr.class);
-
-    // Lock protect
-    private final ReentrantLock lock = new ReentrantLock();
-    private final Map<Long, PullLoadJob> idToJobs = Maps.newHashMap();
-
-    private final BlockingQueue<PullLoadTask> pendingTasks = 
Queues.newLinkedBlockingQueue();
-    private ExecutorService executorService;
-
-    private int concurrency = 10;
-
-    public PullLoadJobMgr(boolean needRegisterMetric) {
-        executorService = 
ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", 
needRegisterMetric);
-    }
-
-    /**
-     * Start Task manager to work.
-     * First it will ask all backends to collect task status
-     * After collected, this will start scheduler to work.
-     */
-    public void start() {
-        for (int i = 0; i < concurrency; ++i) {
-            executorService.submit(new TaskExecutor());
-        }
-    }
-
-    /**
-     * Submit a load job.
-     * This is called when a job turn from pending to ETL.
-     * Or this can be called before 'start' called.
-     */
-    public void submit(PullLoadJob job) {
-        lock.lock();
-        try {
-            if (idToJobs.containsKey(job.getId())) {
-                // Same job id contains
-                return;
-            }
-            idToJobs.put(job.getId(), job);
-            for (PullLoadTask task : job.tasks) {
-                pendingTasks.add(task);
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public PullLoadJob.State getJobState(long jobId) {
-        lock.lock();
-        try {
-            PullLoadJob ctx = idToJobs.get(jobId);
-            if (ctx == null) {
-                return PullLoadJob.State.UNKNOWN;
-            }
-            return ctx.state;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // NOTE:
-    //  Must call this after job's state is not running
-    //  Otherwise, the returned object will be process concurrently,
-    //  this could be lead to an invalid situation
-    public PullLoadJob getJob(long jobId) {
-        lock.lock();
-        try {
-            return idToJobs.get(jobId);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // Cancel one job, remove all its tasks
-    public void cancelJob(long jobId) {
-        PullLoadJob job = null;
-        lock.lock();
-        try {
-            job = idToJobs.remove(jobId);
-        } finally {
-            lock.unlock();
-        }
-        // Cancel job out of lock guard
-        if (job != null) {
-            job.cancel();
-        }
-    }
-
-    // Only used when master replay job
-    public void remove(long jobId) {
-        lock.lock();
-        try {
-            idToJobs.remove(jobId);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isFailureCanRetry(Status status) {
-        return true;
-    }
-
-    public class TaskExecutor implements Runnable {
-
-        private void processOneTask(PullLoadTask task, PullLoadJob job) throws 
UserException {
-            int retryTime = 3;
-            for (int i = 0; i < retryTime; ++i) {
-                if (!job.isRunning()) {
-                    throw new UserException("Job has been cancelled.");
-                }
-                task.executeOnce();
-                if (task.isFinished()) {
-                    return;
-                } else {
-                    boolean needRetry = 
isFailureCanRetry(task.getExecuteStatus());
-                    if (!needRetry) {
-                        break;
-                    }
-                    try {
-                        Thread.sleep(5000);
-                    } catch (InterruptedException e) {
-
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                PullLoadTask task;
-                PullLoadJob job;
-                try {
-                    task = pendingTasks.take();
-                    if (task == null) {
-                        continue;
-                    }
-                    job = getJob(task.jobId);
-                    if (job == null || !job.isRunning()) {
-                        LOG.info("Job is not running now. taskId={}:{}", 
task.jobId, task.taskId);
-                        continue;
-                    }
-                } catch (InterruptedException e) {
-                    LOG.info("Interrupted when take task.");
-                    continue;
-                }
-                try {
-                    processOneTask(task, job);
-                    if (task.isFinished()) {
-                        job.onTaskFinished(task);
-                    } else {
-                        job.onTaskFailed(task);
-                    }
-                } catch (Throwable e) {
-                    LOG.warn("Process one pull load task exception. job id: 
{}, task id: {}",
-                            task.jobId, task.taskId, e);
-                    task.onFailed(null, new Status(TStatusCode.INTERNAL_ERROR, 
e.getMessage()));
-                    job.onTaskFailed(task);
-                }
-            }
-        }
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java
deleted file mode 100644
index 5a44f22..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java
+++ /dev/null
@@ -1,237 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.Status;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.BrokerFileGroup;
-import org.apache.doris.qe.Coordinator;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.thrift.TBrokerFileStatus;
-import org.apache.doris.thrift.TQueryType;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-// A pull load task is used to process one table of this pull load job.
-@Deprecated
-public class PullLoadTask {
-    private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
-    // Input parameter
-    public final long jobId;
-    public final int taskId;
-    public final Database db;
-    public final OlapTable table;
-    public final BrokerDesc brokerDesc;
-    public final List<BrokerFileGroup> fileGroups;
-    public final long jobDeadlineMs;
-
-    private PullLoadTaskPlanner planner;
-
-    // Useful things after executed
-    private Map<String, Long> fileMap;
-    private String trackingUrl;
-    private Map<String, String> counters;
-    private final long execMemLimit;
-
-    // Runtime variables
-    private enum State {
-        RUNNING,
-        FINISHED,
-        FAILED,
-        CANCELLED,
-    }
-
-    private TUniqueId queryId;
-    private Coordinator curCoordinator;
-    private State executeState = State.RUNNING;
-    private Status executeStatus;
-    private Thread curThread;
-
-    public PullLoadTask(
-            long jobId, int taskId,
-            Database db, OlapTable table,
-            BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
-            long jobDeadlineMs, long execMemLimit) {
-        this.jobId = jobId;
-        this.taskId = taskId;
-        this.db = db;
-        this.table = table;
-        this.brokerDesc = brokerDesc;
-        this.fileGroups = fileGroups;
-        this.jobDeadlineMs = jobDeadlineMs;
-        this.execMemLimit = execMemLimit;
-    }
-
-    public void init(List<List<TBrokerFileStatus>> fileStatusList, int 
fileNum) throws UserException {
-        planner = new PullLoadTaskPlanner(this);
-        planner.plan(fileStatusList, fileNum);
-    }
-
-    public Map<String, Long> getFileMap() {
-        return fileMap;
-    }
-
-    public String getTrackingUrl() {
-        return trackingUrl;
-    }
-
-    public Map<String, String> getCounters() {
-        return counters;
-    }
-
-    private long getLeftTimeMs() {
-        if (jobDeadlineMs <= 0) {
-            return Config.broker_load_default_timeout_second * 1000;
-        }
-        return jobDeadlineMs - System.currentTimeMillis();
-    }
-
-    public synchronized void cancel() {
-        if (curCoordinator != null) {
-            curCoordinator.cancel();
-        }
-    }
-
-    public synchronized boolean isFinished() {
-        return executeState == State.FINISHED;
-    }
-
-    public Status getExecuteStatus() {
-        return executeStatus;
-    }
-
-    public synchronized void onCancelled(String reason) {
-        if (executeState == State.RUNNING) {
-            executeState = State.CANCELLED;
-            executeStatus = Status.CANCELLED;
-            LOG.info("cancel one pull load task({}). task id: {}, query id: 
{}, job id: {}",
-                    reason, taskId, 
DebugUtil.printId(curCoordinator.getQueryId()), jobId);
-        }
-    }
-
-    public synchronized void onFinished(Map<String, Long> fileMap,
-                                        Map<String, String> counters,
-                                        String trackingUrl) {
-        if (executeState == State.RUNNING) {
-            executeState = State.FINISHED;
-
-            executeStatus = Status.OK;
-            this.fileMap = fileMap;
-            this.counters = counters;
-            this.trackingUrl = trackingUrl;
-            LOG.info("finished one pull load task. task id: {}, query id: {}, 
job id: {}",
-                    taskId, DebugUtil.printId(curCoordinator.getQueryId()), 
jobId);
-        }
-    }
-
-    public synchronized void onFailed(TUniqueId id, Status failStatus) {
-        if (executeState == State.RUNNING) {
-            if (id != null && !queryId.equals(id)) {
-                return;
-            }
-            executeState = State.FAILED;
-            executeStatus = failStatus;
-            LOG.info("failed one pull load task({}). task id: {}, query id: 
{}, job id: {}",
-                    failStatus.getErrorMsg(), taskId, id != null ? 
DebugUtil.printId(id) : "NaN", jobId);
-        }
-    }
-
-    private void actualExecute() {
-        int waitSecond = (int) (getLeftTimeMs() / 1000);
-        if (waitSecond <= 0) {
-            onCancelled("waiting timeout");
-            return;
-        }
-
-        // TODO(zc): to refine coordinator
-        try {
-            curCoordinator.exec();
-        } catch (Exception e) {
-            LOG.warn("pull load task exec failed", e);
-            onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, 
"Coordinator execute failed: " + e.getMessage()));
-            return;
-        }
-
-        if (curCoordinator.join(waitSecond)) {
-            Status status = curCoordinator.getExecStatus();
-            if (status.ok()) {
-                Map<String, Long> resultFileMap = Maps.newHashMap();
-                for (String file : curCoordinator.getDeltaUrls()) {
-                    resultFileMap.put(file, -1L);
-                }
-                onFinished(resultFileMap, curCoordinator.getLoadCounters(), 
curCoordinator.getTrackingUrl());
-            } else {
-                onFailed(queryId, status);
-            }
-        } else {
-            onCancelled("execution timeout");
-        }
-    }
-
-    public void executeOnce() throws UserException {
-        synchronized (this) {
-            if (curThread != null) {
-                throw new UserException("Task already executing.");
-            }
-            curThread = Thread.currentThread();
-            executeState = State.RUNNING;
-            executeStatus = Status.OK;
-
-            // New one query id,
-            UUID uuid = UUID.randomUUID();
-            queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-            curCoordinator = new Coordinator(jobId, queryId, 
planner.getDescTable(),
-                    planner.getFragments(), planner.getScanNodes(), 
db.getClusterName(), TimeUtils.DEFAULT_TIME_ZONE);
-            curCoordinator.setQueryType(TQueryType.LOAD);
-            curCoordinator.setExecMemoryLimit(execMemLimit);
-            curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
-        }
-
-        boolean needUnregister = false;
-        try {
-            QeProcessorImpl.INSTANCE.registerQuery(queryId, curCoordinator);
-            actualExecute();
-            needUnregister = true;
-        } catch (UserException e) {
-            onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, 
e.getMessage()));
-        } finally {
-            if (needUnregister) {
-                QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
-            }
-            synchronized (this) {
-                curThread = null;
-                curCoordinator = null;
-            }
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java
deleted file mode 100644
index 251ee2d..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java
+++ /dev/null
@@ -1,160 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.NotImplementedException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.BrokerScanNode;
-import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSplitSink;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.OlapRewriteNode;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
-import org.apache.doris.thrift.TBrokerFileStatus;
-
-import com.google.common.collect.Lists;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
-import java.util.List;
-
-// Planner used to generate a plan for pull load ETL work
-@Deprecated
-public class PullLoadTaskPlanner {
-    private static final Logger LOG = 
LogManager.getLogger(PullLoadTaskPlanner.class);
-
-    // Input param
-    private final PullLoadTask task;
-
-    // Something useful
-    private final Analyzer analyzer;
-    private final DescriptorTable descTable;
-
-    // Output param
-    private final List<PlanFragment> fragments;
-    private final List<ScanNode> scanNodes;
-
-    private int nextNodeId = 0;
-
-    public PullLoadTaskPlanner(PullLoadTask task) {
-        this.task = task;
-        this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
-        this.descTable = analyzer.getDescTbl();
-        this.fragments = Lists.newArrayList();
-        this.scanNodes = Lists.newArrayList();
-    }
-
-    // NOTE: DB lock need hold when call this function.
-    public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int 
filesAdded) throws UserException {
-        // Tuple descriptor used for all nodes in plan.
-        OlapTable table = task.table;
-
-        // Generate tuple descriptor
-        List<Expr> slotRefs = Lists.newArrayList();
-        TupleDescriptor tupleDesc = descTable.createTupleDescriptor();
-        for (Column col : table.getFullSchema()) {
-            SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
-            slotDesc.setIsMaterialized(true);
-            slotDesc.setColumn(col);
-            if (col.isAllowNull()) {
-                slotDesc.setIsNullable(true);
-            } else {
-                slotDesc.setIsNullable(false);
-            }
-            slotRefs.add(new SlotRef(slotDesc));
-        }
-
-        // Generate plan tree
-        // 1. first Scan node
-        BrokerScanNode scanNode = new BrokerScanNode(new 
PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
-                                                     fileStatusesList, 
filesAdded);
-        scanNode.setLoadInfo(table, task.brokerDesc, task.fileGroups);
-        scanNode.init(analyzer);
-        scanNodes.add(scanNode);
-
-        // equal node
-        OlapRewriteNode rewriteNode = new OlapRewriteNode(
-                new PlanNodeId(nextNodeId++), scanNode, table, tupleDesc, 
slotRefs);
-        rewriteNode.init(analyzer);
-
-        descTable.computeMemLayout();
-        rewriteNode.finalize(analyzer);
-
-        PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0), 
rewriteNode, DataPartition.RANDOM);
-        scanNode.setFragmentId(scanFragment.getFragmentId());
-        scanNode.setFragment(scanFragment);
-        fragments.add(scanFragment);
-
-        // exchange node
-        ExchangeNode exchangeNode = new ExchangeNode(new 
PlanNodeId(nextNodeId++), rewriteNode, false);
-        exchangeNode.init(analyzer);
-
-        // Create data sink
-        DataSplitSink splitSink = null;
-        try {
-            splitSink = new DataSplitSink(table, tupleDesc);
-        } catch (AnalysisException e) {
-            LOG.info("New DataSplitSink failed.{}", e);
-            throw new UserException(e.getMessage());
-        }
-        PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1), 
exchangeNode, splitSink.getOutputPartition());
-        scanFragment.setDestination(exchangeNode);
-        scanFragment.setOutputPartition(splitSink.getOutputPartition());
-        sinkFragment.setSink(splitSink);
-
-        fragments.add(sinkFragment);
-
-        // Get partition
-        for (PlanFragment fragment : fragments) {
-            try {
-                fragment.finalize(analyzer, false);
-            } catch (NotImplementedException e) {
-                LOG.info("Fragment finalize failed.{}", e);
-                throw new UserException("Fragment finalize failed.");
-            }
-        }
-        Collections.reverse(fragments);
-    }
-
-    public DescriptorTable getDescTable() {
-        return descTable;
-    }
-
-    public List<PlanFragment> getFragments() {
-        return fragments;
-    }
-
-    public List<ScanNode> getScanNodes() {
-        return scanNodes;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to