EmmyMiao87 commented on a change in pull request #3013: add broker load 
internal doc
URL: https://github.com/apache/incubator-doris/pull/3013#discussion_r385018212
 
 

 ##########
 File path: docs/documentation/cn/internal/broker_load.md
 ##########
 @@ -0,0 +1,1014 @@
+# Doris Broker导入实现解析
+
+## 背景
+
+Doris支持多种导入方式,其中Broker导入是一种最常用的方式,用于实现将分布式存储系统(hdfs、bos等)中的文件导入到doris中。 
Broker导入适用的场景是:
+
+- 源数据在Broker可以访问的分布式存储系统中,如HDFS。
+
+- 数据量在20G级别。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 
的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。关于Doris的架构图,参考[Doris架构介绍](http://doris.incubator.apache.org/)
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+* 
Broker:请参考[Broker文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/broker.html)
+
+## 实现原理
+
+在Broker导入中,用户只要提供一份base表的数据,Doris会为用户进行一下处理:
+
+- doris会自动基于base的的数据,为用户生成rollup表的数据,导入到对应的rollup中
+- 实现负导入功能(仅针对聚合模型的SUM类型的value)
+- 从path中提取字段
+- 函数计算,包括strftime,now,hll_hash,md5等
+- 保证导入整个过程的原子性
+
+Broker 
load的语法以及使用方式,请参考[Broker导入文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/load-data/broker-load-manual.html)
+
+### 导入流程
+
+```
+                 +
+                 | 1. user create broker load
+                 v
+            +----+----+
+            |         |
+            |   FE    |
+            |         |
+            +----+----+
+                 |
+                 | 2. BE etl and load the data
+    +--------------------------+
+    |            |             |
++---v---+     +--v----+    +---v---+
+|       |     |       |    |       |
+|  BE   |     |  BE   |    |   BE  |
+|       |     |       |    |       |
++---^---+     +---^---+    +---^---+
+    |             |            |
+    |             |            | 3. pull data from broker
++---+---+     +---+---+    +---+---+
+|       |     |       |    |       |
+|Broker |     |Broker |    |Broker |
+|       |     |       |    |       |
++---^---+     +---^---+    +---^---+
+    |             |            | 
++----------------------------------+
+|       HDFS/BOS/AFS cluster       |
++----------------------------------+
+```
+
+整个导入过程大体如下:
+
+- 用户将请求发送到FE,经过FE进行语法和语意分析,之后生成BrokerLoadJob
+- BrokerLoadJob会经过LoadJob的Scheduler调度,生成一个BrokerLoadPendingTask
+- BrokerLoadPendingTask会对导入源文件进行list,并且按照partition进行构建partition下文件列表
+- 每个partition生成一个LoadLoadingTask,进行导入
+- LoadLoadingTask生成一个分布式的导入执行计划,在后端BE中执行读取源文件,进行ETL转化,写入对应的tablet的过程。
+
+其中关键步骤如下:
+
+#### FE中的处理
+1. 语法和语意处理
+
+```
+                        User Query
+                 +
+                 | mysql protocol
+                 v
+         +-------+-------+
+         |               |
+         |   QeService   |
+         |               |
+         +-------+-------+
+                                |
+                 v
+         +-------+-------+
+         |               |
+         |  MysqlServer  |
+         |               |
+         +-------+-------+
+                                |
+                 v
+       +---------+---------+
+       |                   |
+       |  ConnectScheduler |
+       |                   |
+       +---------+---------+
+                                |
+                 v
+       +---------+---------+
+       |                   |
+       |  ConnectProcessor |
+       |                   |
+       +---------+---------+
+                                |
+                 v
+         +-------+-------+
+         |               |
+         | StmtExecutor  |
+         |               |
+         +-------+-------+
+```
+上述流程,是一个查询发送到Doris之后,进行语法和语意分析所经过的处理流程。其中,在Doris中,MysqlServer是实现了Mysql 
Protocol的一个server,用户接收用户的mysql查询请求,经过ConnectScheduler的调度之后,有ConnectProcessor处理,并且最终由StmtExecutor进行语法和语意分析。
+
+2. Load job执行
+
+```
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+                                | BrokerLoadPendingTask   |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+                                | LoadLodingTask          |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+                                |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+                                |                         Λ
+                 |-------------------------|
+```
+
+用户发起的Broker导入的请求,最终在StmtExecutor经过语法和语意分析之后,会生成LoadStmt,然后在DdlExecutor中,会根据LoadStmt生成BrokerLoadJob。
+
+```cpp
+         if (ddlStmt instanceof LoadStmt) {
+            LoadStmt loadStmt = (LoadStmt) ddlStmt;
+            EtlJobType jobType;
+            if (loadStmt.getBrokerDesc() != null) {
+                jobType = EtlJobType.BROKER;
+            } else {
+                if (Config.disable_hadoop_load) {
+                    throw new DdlException("Load job by hadoop cluster is 
disabled."
+                            + " Try using broker load. See 'help broker 
load;'");
+                }
+                jobType = EtlJobType.HADOOP;
+            }
+            if (loadStmt.getVersion().equals(Load.VERSION) || jobType == 
EtlJobType.HADOOP) {
+                catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, 
jobType, System.currentTimeMillis());
+            } else {
+                catalog.getLoadManager().createLoadJobFromStmt(loadStmt, 
origStmt);
+            }
+        }
+```
+
+BrokerLoadJob的执行,是采用状态机的方式执行的。作业的初始状态为PENDING,在PENDING状态的时候,会做两件事情:
+
+1. 调用全局事务管理器,begin transaction,申请一个txn id。
+2. 
创建一个BrokerLoadPendingTask,该任务的主要作用就是查询用户传的文件路径是否正确,以及文件的个数和大小,将该信息按照partition的方式进行组织,存储在一个Map中(参考数据结构:BrokerPendingTaskAttachment)。
+关键代码如下(BrokerLoadPendingTask.java):
+
+```
+private void getAllFileStatus() throws UserException {
+        long start = System.currentTimeMillis();
+        long totalFileSize = 0;
+        int totalFileNum = 0;
+        for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : 
aggKeyToBrokerFileGroups.entrySet()) {
+            FileGroupAggKey aggKey = entry.getKey();
+            List<BrokerFileGroup> fileGroups = entry.getValue();
+
+            List<List<TBrokerFileStatus>> fileStatusList = 
Lists.newArrayList();
+            long tableTotalFileSize = 0;
+            int tableTotalFileNum = 0;
+            int groupNum = 0;
+            for (BrokerFileGroup fileGroup : fileGroups) {
+                long groupFileSize = 0;
+                List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
+                for (String path : fileGroup.getFilePaths()) {
+                    BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
+                }
+                fileStatusList.add(fileStatuses);
+                for (TBrokerFileStatus fstatus : fileStatuses) {
+                    groupFileSize += fstatus.getSize();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(new LogBuilder(LogKey.LOAD_JOB, 
callback.getCallbackId())
+                                .add("file_status", fstatus).build());
+                    }
+                }
+                tableTotalFileSize += groupFileSize;
+                tableTotalFileNum += fileStatuses.size();
+                LOG.info("get {} files in file group {} for table {}. size: 
{}. job: {}",
+                        fileStatuses.size(), groupNum, entry.getKey(), 
groupFileSize, callback.getCallbackId());
+                groupNum++;
+            }
+
+            totalFileSize += tableTotalFileSize;
+            totalFileNum += tableTotalFileNum;
+            ((BrokerPendingTaskAttachment) attachment).addFileStatus(aggKey, 
fileStatusList);
+            LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, 
job: {}",
+                    tableTotalFileNum, tableTotalFileSize, 
(System.currentTimeMillis() - start), callback.getCallbackId());
+        }
+
+        ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, 
totalFileSize);
+    }
+```
+
+其中,查询用户文件路径的代码就是:`BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);`
+
+在BrokerLoadPedingTask执行完成之后,BrokerLoadJob的状态会变成LOADING状态,在LOADING阶段,主要做的事情就是依据导入指定的partition个数,每个partition创建一个LoadLoadingTask。该任务最主要的事情就是生成导入查询计划,并且分发到后端BE节点进行执行。
+
+##### LoadLoadingTask
+
+LoadLoadingTask是导入过程中**最重要的步骤**,如前所述,它会负责导入计划的生成和执行的分发和执行(依赖查询计划的执行框架Coordinator),生成的查询计划(由LoadingTaskPlanner来负责生成导入计划)如下所示:
+
+```
+Fragment:
+    +------------------------------+
+    |     +----------+---------+   |
+    |     |   BrokerScanNode   |   |
+    |     +----------+---------+   |
+       |                            |             |
+    |                v             |
+    |     +----------+---------+   |
+    |     |   OlapTableSink    |   |
+    |     +---------+----------+   |
+    +------------------------------+
+```
+
+BrokerScanNode中会完成以下工作:
+
+- 实现列
+
+       将源文件的列映射到表中的列,并且支持列的表达式运算(Set功能)
+- 实现负导入
+
+       针对sum类型的value列,支持导入负数据
+- 实现条件过滤
+- column from path机制
+
+       从文件路径中提取列的值
+
+主要的逻辑如下:
+
+```
+DataDescription::analyzeColumn()
+
+       private void analyzeColumns() throws AnalysisException {
+        if ((fileFieldNames == null || fileFieldNames.isEmpty()) && 
(columnsFromPath != null && !columnsFromPath.isEmpty())) {
+            throw new AnalysisException("Can not specify columns_from_path 
without column_list");
+        }
+
+        // used to check duplicated column name in COLUMNS and COLUMNS FROM 
PATH
+        Set<String> columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+
+        // mesrge columns exprs from columns, columns from path and 
columnMappingList
+        // 1. analyze columns
+        if (fileFieldNames != null && !fileFieldNames.isEmpty()) {
+            for (String columnName : fileFieldNames) {
+                if (!columnNames.add(columnName)) {
+                    throw new AnalysisException("Duplicate column: " + 
columnName);
+                }
+                ImportColumnDesc importColumnDesc = new 
ImportColumnDesc(columnName, null);
+                parsedColumnExprList.add(importColumnDesc);
+            }
+        }
+
+        // 2. analyze columns from path
+        if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
+            if (isHadoopLoad) {
+                throw new AnalysisException("Hadoop load does not support 
specifying columns from path");
+            }
+            for (String columnName : columnsFromPath) {
+                if (!columnNames.add(columnName)) {
+                    throw new AnalysisException("Duplicate column: " + 
columnName);
+                }
+                ImportColumnDesc importColumnDesc = new 
ImportColumnDesc(columnName, null);
+                parsedColumnExprList.add(importColumnDesc);
+            }
+        }
+
+        // 3: analyze column mapping
+        if (columnMappingList == null || columnMappingList.isEmpty()) {
+            return;
+        }
+
+        // used to check duplicated column name in SET clause
+        Set<String> columnMappingNames = new 
TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+        // Step2: analyze column mapping
+        // the column expr only support the SlotRef or eq binary predicate 
which's child(0) must be a SloRef.
+        // the duplicate column name of SloRef is forbidden.
+        for (Expr columnExpr : columnMappingList) {
+            if (!(columnExpr instanceof BinaryPredicate)) {
+                throw new AnalysisException("Mapping function expr only 
support the column or eq binary predicate. "
+                        + "Expr: " + columnExpr.toSql());
+            }
+            BinaryPredicate predicate = (BinaryPredicate) columnExpr;
+            if (predicate.getOp() != Operator.EQ) {
+                throw new AnalysisException("Mapping function expr only 
support the column or eq binary predicate. "
+                        + "The mapping operator error, op: " + 
predicate.getOp());
+            }
+            Expr child0 = predicate.getChild(0);
+            if (!(child0 instanceof SlotRef)) {
+                throw new AnalysisException("Mapping function expr only 
support the column or eq binary predicate. "
+                        + "The mapping column error. column: " + 
child0.toSql());
+            }
+            String column = ((SlotRef) child0).getColumnName();
+            if (!columnMappingNames.add(column)) {
+                throw new AnalysisException("Duplicate column mapping: " + 
column);
+            }
+            // hadoop load only supports the FunctionCallExpr
+            Expr child1 = predicate.getChild(1);
+            if (isHadoopLoad && !(child1 instanceof FunctionCallExpr)) {
+                throw new AnalysisException("Hadoop load only supports the 
designated function. "
+                        + "The error mapping function is:" + child1.toSql());
+            }
+            ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, 
child1);
+            parsedColumnExprList.add(importColumnDesc);
+            if (child1 instanceof FunctionCallExpr) {
+                analyzeColumnToHadoopFunction(column, child1);
+            }
+        }
+    }
+
+```
+该函数会进行
+
+1. 数据源的列和目标表的列之间映射关系的计算
+2. 将ColumnsFromPath的列也会提取出来,
+3. 分析函数计算相关的列
+
+分析的各个列的映射结果会保存在parsedColumnExprList。
+
+```
+DataDescription::analyzeColumnToHadoopFunction()
+
+    private void analyzeColumnToHadoopFunction(String columnName, Expr child1) 
throws AnalysisException {
+        Preconditions.checkState(child1 instanceof FunctionCallExpr); 
+        FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1;
+        String functionName = functionCallExpr.getFnName().getFunction();
+        if 
(!HADOOP_SUPPORT_FUNCTION_NAMES.contains(functionName.toLowerCase())) {
+            return;
+        }
+        List<Expr> paramExprs = functionCallExpr.getParams().exprs();
+        List<String> args = Lists.newArrayList();
+
+        for (Expr paramExpr : paramExprs) {
+            if (paramExpr instanceof SlotRef) {
+                SlotRef slot = (SlotRef) paramExpr;
+                args.add(slot.getColumnName());
+            } else if (paramExpr instanceof StringLiteral) {
+                StringLiteral literal = (StringLiteral) paramExpr;
+                args.add(literal.getValue());
+            } else if (paramExpr instanceof NullLiteral) {
+                args.add(null);
+            } else {
+                if (isHadoopLoad) {
+                    // hadoop function only support slot, string and null 
parameters
+                    throw new AnalysisException("Mapping function args error, 
arg: " + paramExpr.toSql());
+                }
+            }
+        }
+
+        Pair<String, List<String>> functionPair = new Pair<String, 
List<String>>(functionName, args);
+        columnToHadoopFunction.put(columnName, functionPair);
+    }
+```
+上述是针对函数计算的列(Set中的列映射关系),最终得到该列使用的函数名和参数列表。
+
+```
+BrokerScanNode::finalizeParams()
+
+       private void finalizeParams(ParamCreateContext context) throws 
UserException, AnalysisException {
+        Map<String, SlotDescriptor> slotDescByName = context.slotDescByName;
+        Map<String, Expr> exprMap = context.exprMap;
+        Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
+
+        boolean isNegative = context.fileGroup.isNegative();
+        for (SlotDescriptor destSlotDesc : desc.getSlots()) {
+            if (!destSlotDesc.isMaterialized()) {
+                continue;
+            }
+            Expr expr = null;
+            if (exprMap != null) {
+                expr = exprMap.get(destSlotDesc.getColumn().getName());
+            }
+            if (expr == null) {
+                SlotDescriptor srcSlotDesc = 
slotDescByName.get(destSlotDesc.getColumn().getName());
+                if (srcSlotDesc != null) {
+                    
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), 
srcSlotDesc.getId().asInt());
+                    // If dest is allow null, we set source to nullable
+                    if (destSlotDesc.getColumn().isAllowNull()) {
+                        srcSlotDesc.setIsNullable(true);
+                    }
+                    expr = new SlotRef(srcSlotDesc);
+                } else {
+                    Column column = destSlotDesc.getColumn();
+                    if (column.getDefaultValue() != null) {
+                        expr = new 
StringLiteral(destSlotDesc.getColumn().getDefaultValue());
+                    } else {
+                        if (column.isAllowNull()) {
+                            expr = NullLiteral.create(column.getType());
+                        } else {
+                            throw new UserException("Unknown slot ref("
+                                    + destSlotDesc.getColumn().getName() + ") 
in source file");
+                        }
+                    }
+                }
+            }
+
+            // check hll_hash
+            if (destSlotDesc.getType().getPrimitiveType() == 
PrimitiveType.HLL) {
+                if (!(expr instanceof FunctionCallExpr)) {
+                    throw new AnalysisException("HLL column must use hll_hash 
function, like "
+                            + destSlotDesc.getColumn().getName() + 
"=hll_hash(xxx)");
+                }
+                FunctionCallExpr fn = (FunctionCallExpr) expr;
+                if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash") 
&& !fn.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
+                    throw new AnalysisException("HLL column must use hll_hash 
function, like "
+                            + destSlotDesc.getColumn().getName() + 
"=hll_hash(xxx) or " + destSlotDesc.getColumn().getName() + "=hll_empty()");
+                }
+                expr.setType(Type.HLL);
+            }
+
+            checkBitmapCompatibility(destSlotDesc, expr);
+
+            // analyze negative
+            if (isNegative && destSlotDesc.getColumn().getAggregationType() == 
AggregateType.SUM) {
+                expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, 
expr, new IntLiteral(-1));
+                expr.analyze(analyzer);
+            }
+            expr = castToSlot(destSlotDesc, expr);
+            
context.params.putToExpr_of_dest_slot(destSlotDesc.getId().asInt(), 
expr.treeToThrift());
+        }
+        
context.params.setDest_sid_to_src_sid_without_trans(destSidToSrcSidWithoutTrans);
+        
context.params.setSrc_tuple_id(context.tupleDescriptor.getId().asInt());
+        context.params.setDest_tuple_id(desc.getId().asInt());
+        context.params.setStrict_mode(strictMode);
+        // Need re compute memory layout after set some slot descriptor to 
nullable
+        context.tupleDescriptor.computeMemLayout();
+    }
+```
+该函数实现分析哪些列是在数据源中直接指定的列(包括columnsFromPath的列),以及怎么从源列转化生成目标列,哪些列是函数计算的列,数据源的schema,目标表的schema等信息。最终,这些参数都会设置到BrokerScanNode的参数TBrokerScanRangeParams中,最终经过序列化之后通过rpc接口传给BE,BE会依据这些信息执行具体的ETL。该结构体定义如下:
+
+```
+struct TBrokerScanRangeParams {
+       // 列分隔符
+    1: required byte column_separator;
+       // 行分隔符
+    2: required byte line_delimiter;
+
+    // 数据源schema id
+       // 需要依赖TDescriptorTable获取对应的tuple descriptor
+    3: required Types.TTupleId src_tuple_id
+    // 数据源列的类型id
+       // 需要依赖TDescriptorTable获取对应的slot descriptor
+    4: required list<Types.TSlotId> src_slot_ids
+
+    // 目标表的tuple id
+    5: required Types.TTupleId dest_tuple_id
+    // set中执行的列
+    6: optional map<Types.TSlotId, Exprs.TExpr> expr_of_dest_slot
+
+    // properties need to access broker.
+    7: optional map<string, string> properties;
+
+    // If partition_ids is set, data that doesn't in this partition will be 
filtered.
+    8: optional list<i64> partition_ids
+        
+    // 数据源列直接映射成目标列
+    9: optional map<Types.TSlotId, Types.TSlotId> 
dest_sid_to_src_sid_without_trans
+    // strictMode is a boolean
+    // if strict mode is true, the incorrect data (the result of cast is null) 
will not be loaded
+    10: optional bool strict_mode
+}
+```
+
+#### BE中的处理
+
+BE中主要就是执行在LoadLoadingTask任务中下发的导入执行计划,需要依赖于Doris的查询框架(请参考后续文章)。这里大概介绍一下BE中查询计划的执行框架。在BE中,查询计划分为逻辑查询计划(Single
 PlanFragment)和可执行分布式查询计划(Distributed 
PlanFragment),在BE中主要涉及可执行分布式查询计划,其中PlanFragment的概念与Spark中的stage概念类似,表示一个计划能够在一个BE中单独执行的计划片段。BE实现了PlanFragment执行的框架,逻辑在plan_fragment_executor.cpp中,大概逻辑如下:
+```
+Status PlanFragmentExecutor::open() {
+    ...
+    Status status = open_internal();
+    ...
+    return status;
+}
+
+Status PlanFragmentExecutor::open_internal() {
+    ...
+    if (_sink.get() == NULL) {
+        return Status::OK;
+    }
+    // 打开 data sink
+    RETURN_IF_ERROR(_sink->open(runtime_state()));
+
+       ...
+    while (true) {
+       // 调用 get_next() 从 ScanNode 获取数据
+        RETURN_IF_ERROR(get_next_internal(&batch));
+        ...
+        // 通过 DataSink 的 send() 方法发送数据
+        RETURN_IF_ERROR(_sink->send(runtime_state(), batch));
+    }
+
+    ...
+
+    return Status::OK;
+}
+```
+一个PlanFragment中包含多个ExecNode组成的树状执行计划(类型spark 
stage中包含的rdd),其中SinkNode(上面中的_sink变量)类型spark中带action操作的rdd,执行数据序列化操作,可能后续会进行持久化,也可能通过网络发送到下一个PlanFragment中。get_next_internal会调用PlanNode执行计划树获取数据,然后通过SinkNode发送到下一个PlanFragment或者输出结果。
 
 Review comment:
   其实sink不是一个node,他就是sink~

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to