imay commented on a change in pull request #3717:
URL: https://github.com/apache/incubator-doris/pull/3717#discussion_r432835053



##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
##########
@@ -0,0 +1,700 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.EtlStatus;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TabletQuorumFailedException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * There are 4 steps in SparkLoadJob:
+ * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method 
and submit spark etl job.
+ * Step2: LoadEtlChecker will check spark etl job status periodly and send 
push tasks to be when spark etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodly and commit 
transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and 
finish transaction.
+ */
+public class SparkLoadJob extends BulkLoadJob {
+    private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
+
+    // for global dict
+    public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
+
+    // --- members below need persist ---
+    // create from resourceDesc when job created
+    private SparkResource sparkResource;
+    // members below updated when job state changed to etl
+    private long etlStartTimestamp = -1;
+    // for spark yarn
+    private String appId = "";
+    // spark job outputPath
+    private String etlOutputPath = "";
+    // members below updated when job state changed to loading
+    // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, 
etlFileSize) }
+    private Map<String, Pair<String, Long>> tabletMetaToFileInfo = 
Maps.newHashMap();
+
+    // --- members below not persist ---
+    // temporary use
+    // one SparkLoadJob has only one table to load
+    // hivedb.table for global dict
+    private String hiveTableName = "";
+    private ResourceDesc resourceDesc;
+    // for spark standalone
+    private SparkAppHandle sparkAppHandle;
+    // for straggler wait long time to commit transaction
+    private long quorumFinishTimestamp = -1;
+    // below for push task
+    private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
+    private Map<Long, PushBrokerScannerParams> indexToPushBrokerReaderParams = 
Maps.newHashMap();
+    private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
+    private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = 
Maps.newHashMap();
+    private Set<Long> finishedReplicas = Sets.newHashSet();
+    private Set<Long> quorumTablets = Sets.newHashSet();
+    private Set<Long> fullTablets = Sets.newHashSet();
+
+    private static class PushBrokerScannerParams {
+        TBrokerScanRange tBrokerScanRange;
+        TDescriptorTable tDescriptorTable;
+
+        public void init(List<Column> columns, BrokerDesc brokerDesc) throws 
UserException {
+            Analyzer analyzer = new Analyzer(null, null);
+            // Generate tuple descriptor
+            DescriptorTable descTable = analyzer.getDescTbl();
+            TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+            // use index schema to fill the descriptor table
+            for (Column column : columns) {
+                SlotDescriptor destSlotDesc = 
descTable.addSlotDescriptor(destTupleDesc);
+                destSlotDesc.setIsMaterialized(true);
+                destSlotDesc.setColumn(column);
+                if (column.isAllowNull()) {
+                    destSlotDesc.setIsNullable(true);
+                } else {
+                    destSlotDesc.setIsNullable(false);
+                }
+            }
+            // Push broker scan node
+            PushBrokerScanNode scanNode = new 
PushBrokerScanNode(destTupleDesc);

Review comment:
       Why is this ScanNode needed?
   Seems that this scanNode will be destruct after this function.

##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
##########
@@ -0,0 +1,700 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.EtlStatus;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TabletQuorumFailedException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * There are 4 steps in SparkLoadJob:
+ * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method 
and submit spark etl job.
+ * Step2: LoadEtlChecker will check spark etl job status periodly and send 
push tasks to be when spark etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodly and commit 
transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and 
finish transaction.
+ */
+public class SparkLoadJob extends BulkLoadJob {
+    private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
+
+    // for global dict
+    public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
+
+    // --- members below need persist ---
+    // create from resourceDesc when job created
+    private SparkResource sparkResource;
+    // members below updated when job state changed to etl
+    private long etlStartTimestamp = -1;
+    // for spark yarn
+    private String appId = "";
+    // spark job outputPath
+    private String etlOutputPath = "";
+    // members below updated when job state changed to loading
+    // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, 
etlFileSize) }
+    private Map<String, Pair<String, Long>> tabletMetaToFileInfo = 
Maps.newHashMap();
+
+    // --- members below not persist ---
+    // temporary use
+    // one SparkLoadJob has only one table to load
+    // hivedb.table for global dict
+    private String hiveTableName = "";
+    private ResourceDesc resourceDesc;
+    // for spark standalone
+    private SparkAppHandle sparkAppHandle;

Review comment:
       When is this assigned?

##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
##########
@@ -0,0 +1,700 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.EtlStatus;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.system.Backend;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+import org.apache.doris.thrift.TBrokerRangeDesc;
+import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TBrokerScanRangeParams;
+import org.apache.doris.thrift.TDescriptorTable;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPriority;
+import org.apache.doris.thrift.TPushType;
+import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TabletQuorumFailedException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * There are 4 steps in SparkLoadJob:
+ * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method 
and submit spark etl job.
+ * Step2: LoadEtlChecker will check spark etl job status periodly and send 
push tasks to be when spark etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodly and commit 
transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and 
finish transaction.
+ */
+public class SparkLoadJob extends BulkLoadJob {
+    private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
+
+    // for global dict
+    public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
+
+    // --- members below need persist ---
+    // create from resourceDesc when job created
+    private SparkResource sparkResource;
+    // members below updated when job state changed to etl
+    private long etlStartTimestamp = -1;
+    // for spark yarn
+    private String appId = "";
+    // spark job outputPath
+    private String etlOutputPath = "";
+    // members below updated when job state changed to loading
+    // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, 
etlFileSize) }
+    private Map<String, Pair<String, Long>> tabletMetaToFileInfo = 
Maps.newHashMap();
+
+    // --- members below not persist ---
+    // temporary use
+    // one SparkLoadJob has only one table to load
+    // hivedb.table for global dict
+    private String hiveTableName = "";
+    private ResourceDesc resourceDesc;
+    // for spark standalone
+    private SparkAppHandle sparkAppHandle;
+    // for straggler wait long time to commit transaction
+    private long quorumFinishTimestamp = -1;
+    // below for push task
+    private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
+    private Map<Long, PushBrokerScannerParams> indexToPushBrokerReaderParams = 
Maps.newHashMap();
+    private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
+    private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = 
Maps.newHashMap();
+    private Set<Long> finishedReplicas = Sets.newHashSet();
+    private Set<Long> quorumTablets = Sets.newHashSet();
+    private Set<Long> fullTablets = Sets.newHashSet();
+
+    private static class PushBrokerScannerParams {
+        TBrokerScanRange tBrokerScanRange;
+        TDescriptorTable tDescriptorTable;
+
+        public void init(List<Column> columns, BrokerDesc brokerDesc) throws 
UserException {
+            Analyzer analyzer = new Analyzer(null, null);
+            // Generate tuple descriptor
+            DescriptorTable descTable = analyzer.getDescTbl();
+            TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+            // use index schema to fill the descriptor table
+            for (Column column : columns) {
+                SlotDescriptor destSlotDesc = 
descTable.addSlotDescriptor(destTupleDesc);
+                destSlotDesc.setIsMaterialized(true);
+                destSlotDesc.setColumn(column);
+                if (column.isAllowNull()) {
+                    destSlotDesc.setIsNullable(true);
+                } else {
+                    destSlotDesc.setIsNullable(false);
+                }
+            }
+            // Push broker scan node
+            PushBrokerScanNode scanNode = new 
PushBrokerScanNode(destTupleDesc);
+            scanNode.setLoadInfo(columns, brokerDesc);
+            scanNode.init(analyzer);
+            tBrokerScanRange = scanNode.getTBrokerScanRange();
+
+            // descTable
+            descTable.computeMemLayout();
+            tDescriptorTable = descTable.toThrift();
+        }
+    }
+
+    private static class PushBrokerScanNode extends ScanNode {
+        private TBrokerScanRange tBrokerScanRange;
+        private List<Column> columns;
+        private BrokerDesc brokerDesc;
+
+        public PushBrokerScanNode(TupleDescriptor destTupleDesc) {
+            super(new PlanNodeId(0), destTupleDesc, "PushBrokerScanNode");
+            this.tBrokerScanRange = new TBrokerScanRange();
+        }
+
+        public void setLoadInfo(List<Column> columns, BrokerDesc brokerDesc) {
+            this.columns = columns;
+            this.brokerDesc = brokerDesc;
+        }
+
+        public void init(Analyzer analyzer) throws UserException {
+            super.init(analyzer);
+
+            // scan range params
+            TBrokerScanRangeParams params = new TBrokerScanRangeParams();
+            params.setStrict_mode(false);
+            params.setProperties(brokerDesc.getProperties());
+            TupleDescriptor srcTupleDesc = 
analyzer.getDescTbl().createTupleDescriptor();
+            Map<String, SlotDescriptor> srcSlotDescByName = Maps.newHashMap();
+            for (Column column : columns) {
+                SlotDescriptor srcSlotDesc = 
analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
+                
srcSlotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+                srcSlotDesc.setIsMaterialized(true);
+                srcSlotDesc.setIsNullable(true);
+                srcSlotDesc.setColumn(new Column(column.getName(), 
PrimitiveType.VARCHAR));
+                params.addToSrc_slot_ids(srcSlotDesc.getId().asInt());
+                srcSlotDescByName.put(column.getName(), srcSlotDesc);
+            }
+
+            TupleDescriptor destTupleDesc = desc;
+            Map<Integer, Integer> destSidToSrcSidWithoutTrans = 
Maps.newHashMap();
+            for (SlotDescriptor destSlotDesc : destTupleDesc.getSlots()) {
+                if (!destSlotDesc.isMaterialized()) {
+                    continue;
+                }
+
+                SlotDescriptor srcSlotDesc = 
srcSlotDescByName.get(destSlotDesc.getColumn().getName());
+                destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), 
srcSlotDesc.getId().asInt());
+                Expr expr = new SlotRef(srcSlotDesc);
+                if (destSlotDesc.getType().getPrimitiveType() == 
PrimitiveType.BOOLEAN) {
+                    // there is no cast string to boolean function
+                    // so we cast string to tinyint first, then cast tinyint 
to boolean
+                    expr = new CastExpr(Type.BOOLEAN, new 
CastExpr(Type.TINYINT, expr));
+                } else {
+                    expr = castToSlot(destSlotDesc, expr);
+                }
+                params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), 
expr.treeToThrift());
+            }
+            
params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
+            params.setSrc_tuple_id(srcTupleDesc.getId().asInt());
+            params.setDest_tuple_id(destTupleDesc.getId().asInt());
+            tBrokerScanRange.setParams(params);
+
+            // broker address updated for each replica
+            tBrokerScanRange.setBroker_addresses(Lists.newArrayList());
+
+            // broker range desc
+            TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
+            tBrokerRangeDesc.setFile_type(TFileType.FILE_BROKER);
+            tBrokerRangeDesc.setFormat_type(TFileFormatType.FORMAT_PARQUET);
+            tBrokerRangeDesc.setSplittable(false);
+            tBrokerRangeDesc.setStart_offset(0);
+            tBrokerRangeDesc.setSize(-1);
+            // path and file size updated for each replica
+            tBrokerScanRange.setRanges(Lists.newArrayList(tBrokerRangeDesc));
+        }
+
+        public TBrokerScanRange getTBrokerScanRange() {
+            return tBrokerScanRange;
+        }
+
+        @Override
+        public List<TScanRangeLocations> getScanRangeLocations(long 
maxScanRangeLength) {
+            return null;
+        }
+
+        @Override
+        protected void toThrift(TPlanNode msg) {}
+    }
+
+    // only for log replay
+    public SparkLoadJob() {
+        super();
+        jobType = EtlJobType.SPARK;
+    }
+
+    public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, 
OriginStatement originStmt)
+            throws MetaNotFoundException {
+        super(dbId, label, originStmt);
+        this.resourceDesc = resourceDesc;
+        timeoutSecond = Config.spark_load_default_timeout_second;
+        jobType = EtlJobType.SPARK;
+    }
+
+    private boolean checkState(JobState expectState) {
+        readLock();
+        try {
+            if (state == expectState) {
+                return true;
+            }
+            return false;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public void updateEtlStatus() throws Exception {
+        if (!checkState(JobState.ETL)) {
+            return;
+        }
+
+        // get etl status
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        EtlStatus status = handler.getEtlJobStatus(sparkAppHandle, appId, id, 
etlOutputPath, sparkResource, brokerDesc);
+        switch (status.getState()) {
+            case RUNNING:
+                updateEtlStatusInternal(status);
+                break;
+            case FINISHED:
+                processEtlFinish(status, handler);
+                break;
+            case CANCELLED:
+                throw new LoadException("spark etl job failed. msg: " + 
status.getFailMsg());
+            default:
+                LOG.warn("unknown etl state: {}", status.getState().name());
+                break;
+        }
+    }
+
+    private void updateEtlStatusInternal(EtlStatus etlStatus) {
+        writeLock();
+        try {
+            loadingStatus = etlStatus;
+            progress = etlStatus.getProgress();
+            if (!sparkResource.isYarnMaster()) {
+                loadingStatus.setTrackingUrl(appId);
+            }
+
+            // TODO(wyb): spark-load
+            /*
+            DppResult dppResult = etlStatus.getDppResult();
+            if (dppResult != null) {
+                // update load statistic and counters when spark etl job 
finished
+                // fe gets these infos from spark dpp, so we use dummy load id 
and dummy backend id here
+                loadStatistic.fileNum = (int) dppResult.fileNumber;
+                loadStatistic.totalFileSizeB = dppResult.fileSize;
+                TUniqueId dummyId = new TUniqueId(0, 0);
+                long dummyBackendId = -1L;
+                loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), 
Lists.newArrayList(dummyBackendId));
+                loadStatistic.updateLoadProgress(dummyBackendId, dummyId, 
dummyId, dppResult.scannedRows, true);
+
+                Map<String, String> counters = loadingStatus.getCounters();
+                counters.put(DPP_NORMAL_ALL, 
String.valueOf(dppResult.normalRows));
+                counters.put(DPP_ABNORMAL_ALL, 
String.valueOf(dppResult.abnormalRows));
+                counters.put(UNSELECTED_ROWS, 
String.valueOf(dppResult.unselectRows));
+            }
+            */
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void processEtlFinish(EtlStatus etlStatus, SparkEtlJobHandler 
handler) throws Exception {
+        updateEtlStatusInternal(etlStatus);
+        // checkDataQuality
+        if (!checkDataQuality()) {
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+                                  true, true);
+            return;
+        }
+
+        // get etl output files and update loading state
+        updateToLoadingState(etlStatus, handler.getEtlFilePaths(etlOutputPath, 
brokerDesc));
+        // log loading state
+        logUpdateStateInfo();
+
+        // create push tasks
+        prepareLoadingInfos();
+        submitPushTasks();
+    }
+
+    private void updateToLoadingState(EtlStatus etlStatus, Map<String, Long> 
filePathToSize) throws LoadException {
+        writeLock();

Review comment:
       why not get lock in the outer public function? In most scenario, there 
is only one writer.
   If you put this lock in a private function, there is a case that multiple 
threads will do proceeEtlFinish concurrently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to