EmmyMiao87 commented on a change in pull request #3013: add broker load 
internal doc
URL: https://github.com/apache/incubator-doris/pull/3013#discussion_r385010338
 
 

 ##########
 File path: docs/documentation/cn/internal/broker_load.md
 ##########
 @@ -0,0 +1,1014 @@
+# Doris Broker导入实现解析
+
+## 背景
+
+Doris支持多种导入方式,其中Broker导入是一种最常用的方式,用于实现将分布式存储系统(hdfs、bos等)中的文件导入到doris中。 
Broker导入适用的场景是:
+
+- 源数据在Broker可以访问的分布式存储系统中,如HDFS。
+
+- 数据量在20G级别。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 
的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。关于Doris的架构图,参考[Doris架构介绍](http://doris.incubator.apache.org/)
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+* 
Broker:请参考[Broker文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/broker.html)
+
+## 实现原理
+
+在Broker导入中,用户只要提供一份base表的数据,Doris会为用户进行一下处理:
+
+- doris会自动基于base的的数据,为用户生成rollup表的数据,导入到对应的rollup中
+- 实现负导入功能(仅针对聚合模型的SUM类型的value)
+- 从path中提取字段
+- 函数计算,包括strftime,now,hll_hash,md5等
+- 保证导入整个过程的原子性
+
+Broker 
load的语法以及使用方式,请参考[Broker导入文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/load-data/broker-load-manual.html)
+
+### 导入流程
+
+```
+                 +
+                 | 1. user create broker load
+                 v
+            +----+----+
+            |         |
+            |   FE    |
+            |         |
+            +----+----+
+                 |
+                 | 2. BE etl and load the data
+    +--------------------------+
+    |            |             |
++---v---+     +--v----+    +---v---+
+|       |     |       |    |       |
+|  BE   |     |  BE   |    |   BE  |
+|       |     |       |    |       |
++---^---+     +---^---+    +---^---+
+    |             |            |
+    |             |            | 3. pull data from broker
++---+---+     +---+---+    +---+---+
+|       |     |       |    |       |
+|Broker |     |Broker |    |Broker |
+|       |     |       |    |       |
++---^---+     +---^---+    +---^---+
+    |             |            | 
++----------------------------------+
+|       HDFS/BOS/AFS cluster       |
++----------------------------------+
+```
+
+整个导入过程大体如下:
+
+- 用户将请求发送到FE,经过FE进行语法和语意分析,之后生成BrokerLoadJob
+- BrokerLoadJob会经过LoadJob的Scheduler调度,生成一个BrokerLoadPendingTask
+- BrokerLoadPendingTask会对导入源文件进行list,并且按照partition进行构建partition下文件列表
+- 每个partition生成一个LoadLoadingTask,进行导入
+- LoadLoadingTask生成一个分布式的导入执行计划,在后端BE中执行读取源文件,进行ETL转化,写入对应的tablet的过程。
+
+其中关键步骤如下:
+
+#### FE中的处理
+1. 语法和语意处理
+
+```
+                        User Query
+                 +
+                 | mysql protocol
+                 v
+         +-------+-------+
+         |               |
+         |   QeService   |
+         |               |
+         +-------+-------+
+                                |
+                 v
+         +-------+-------+
+         |               |
+         |  MysqlServer  |
+         |               |
+         +-------+-------+
+                                |
+                 v
+       +---------+---------+
+       |                   |
+       |  ConnectScheduler |
+       |                   |
+       +---------+---------+
+                                |
+                 v
+       +---------+---------+
+       |                   |
+       |  ConnectProcessor |
+       |                   |
+       +---------+---------+
+                                |
+                 v
+         +-------+-------+
+         |               |
+         | StmtExecutor  |
+         |               |
+         +-------+-------+
+```
+上述流程,是一个查询发送到Doris之后,进行语法和语意分析所经过的处理流程。其中,在Doris中,MysqlServer是实现了Mysql 
Protocol的一个server,用户接收用户的mysql查询请求,经过ConnectScheduler的调度之后,有ConnectProcessor处理,并且最终由StmtExecutor进行语法和语意分析。
+
+2. Load job执行
+
+```
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+                                | BrokerLoadPendingTask   |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+                                | LoadLodingTask          |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+                                |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+                                |                         Λ
+                 |-------------------------|
+```
+
+用户发起的Broker导入的请求,最终在StmtExecutor经过语法和语意分析之后,会生成LoadStmt,然后在DdlExecutor中,会根据LoadStmt生成BrokerLoadJob。
+
+```cpp
+         if (ddlStmt instanceof LoadStmt) {
+            LoadStmt loadStmt = (LoadStmt) ddlStmt;
+            EtlJobType jobType;
+            if (loadStmt.getBrokerDesc() != null) {
+                jobType = EtlJobType.BROKER;
+            } else {
+                if (Config.disable_hadoop_load) {
+                    throw new DdlException("Load job by hadoop cluster is 
disabled."
+                            + " Try using broker load. See 'help broker 
load;'");
+                }
+                jobType = EtlJobType.HADOOP;
+            }
+            if (loadStmt.getVersion().equals(Load.VERSION) || jobType == 
EtlJobType.HADOOP) {
+                catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, 
jobType, System.currentTimeMillis());
+            } else {
+                catalog.getLoadManager().createLoadJobFromStmt(loadStmt, 
origStmt);
+            }
+        }
+```
+
+BrokerLoadJob的执行,是采用状态机的方式执行的。作业的初始状态为PENDING,在PENDING状态的时候,会做两件事情:
+
+1. 调用全局事务管理器,begin transaction,申请一个txn id。
+2. 
创建一个BrokerLoadPendingTask,该任务的主要作用就是查询用户传的文件路径是否正确,以及文件的个数和大小,将该信息按照partition的方式进行组织,存储在一个Map中(参考数据结构:BrokerPendingTaskAttachment)。
+关键代码如下(BrokerLoadPendingTask.java):
+
+```
+private void getAllFileStatus() throws UserException {
+        long start = System.currentTimeMillis();
+        long totalFileSize = 0;
+        int totalFileNum = 0;
+        for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : 
aggKeyToBrokerFileGroups.entrySet()) {
+            FileGroupAggKey aggKey = entry.getKey();
+            List<BrokerFileGroup> fileGroups = entry.getValue();
+
+            List<List<TBrokerFileStatus>> fileStatusList = 
Lists.newArrayList();
+            long tableTotalFileSize = 0;
+            int tableTotalFileNum = 0;
+            int groupNum = 0;
+            for (BrokerFileGroup fileGroup : fileGroups) {
+                long groupFileSize = 0;
+                List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
+                for (String path : fileGroup.getFilePaths()) {
+                    BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
+                }
+                fileStatusList.add(fileStatuses);
+                for (TBrokerFileStatus fstatus : fileStatuses) {
+                    groupFileSize += fstatus.getSize();
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(new LogBuilder(LogKey.LOAD_JOB, 
callback.getCallbackId())
+                                .add("file_status", fstatus).build());
+                    }
+                }
+                tableTotalFileSize += groupFileSize;
+                tableTotalFileNum += fileStatuses.size();
+                LOG.info("get {} files in file group {} for table {}. size: 
{}. job: {}",
+                        fileStatuses.size(), groupNum, entry.getKey(), 
groupFileSize, callback.getCallbackId());
+                groupNum++;
+            }
+
+            totalFileSize += tableTotalFileSize;
+            totalFileNum += tableTotalFileNum;
+            ((BrokerPendingTaskAttachment) attachment).addFileStatus(aggKey, 
fileStatusList);
+            LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, 
job: {}",
+                    tableTotalFileNum, tableTotalFileSize, 
(System.currentTimeMillis() - start), callback.getCallbackId());
+        }
+
+        ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, 
totalFileSize);
+    }
+```
+
+其中,查询用户文件路径的代码就是:`BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);`
+
+在BrokerLoadPedingTask执行完成之后,BrokerLoadJob的状态会变成LOADING状态,在LOADING阶段,主要做的事情就是依据导入指定的partition个数,每个partition创建一个LoadLoadingTask。该任务最主要的事情就是生成导入查询计划,并且分发到后端BE节点进行执行。
 
 Review comment:
   不是执行完,是pendingtask被生成并加到队列中,job就变成loading了。

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to