EmmyMiao87 commented on a change in pull request #3013: add broker load internal doc URL: https://github.com/apache/incubator-doris/pull/3013#discussion_r385017292
########## File path: docs/documentation/cn/internal/broker_load.md ########## @@ -0,0 +1,1014 @@ +# Doris Broker导入实现解析 + +## 背景 + +Doris支持多种导入方式,其中Broker导入是一种最常用的方式,用于实现将分布式存储系统(hdfs、bos等)中的文件导入到doris中。 Broker导入适用的场景是: + +- 源数据在Broker可以访问的分布式存储系统中,如HDFS。 + +- 数据量在20G级别。 + +## 名词解释 + +* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。关于Doris的架构图,参考[Doris架构介绍](http://doris.incubator.apache.org/) +* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。 +* Broker:请参考[Broker文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/broker.html) + +## 实现原理 + +在Broker导入中,用户只要提供一份base表的数据,Doris会为用户进行一下处理: + +- doris会自动基于base的的数据,为用户生成rollup表的数据,导入到对应的rollup中 +- 实现负导入功能(仅针对聚合模型的SUM类型的value) +- 从path中提取字段 +- 函数计算,包括strftime,now,hll_hash,md5等 +- 保证导入整个过程的原子性 + +Broker load的语法以及使用方式,请参考[Broker导入文档](http://doris.incubator.apache.org/documentation/cn/administrator-guide/load-data/broker-load-manual.html) + +### 导入流程 + +``` + + + | 1. user create broker load + v + +----+----+ + | | + | FE | + | | + +----+----+ + | + | 2. BE etl and load the data + +--------------------------+ + | | | ++---v---+ +--v----+ +---v---+ +| | | | | | +| BE | | BE | | BE | +| | | | | | ++---^---+ +---^---+ +---^---+ + | | | + | | | 3. pull data from broker ++---+---+ +---+---+ +---+---+ +| | | | | | +|Broker | |Broker | |Broker | +| | | | | | ++---^---+ +---^---+ +---^---+ + | | | ++----------------------------------+ +| HDFS/BOS/AFS cluster | ++----------------------------------+ +``` + +整个导入过程大体如下: + +- 用户将请求发送到FE,经过FE进行语法和语意分析,之后生成BrokerLoadJob +- BrokerLoadJob会经过LoadJob的Scheduler调度,生成一个BrokerLoadPendingTask +- BrokerLoadPendingTask会对导入源文件进行list,并且按照partition进行构建partition下文件列表 +- 每个partition生成一个LoadLoadingTask,进行导入 +- LoadLoadingTask生成一个分布式的导入执行计划,在后端BE中执行读取源文件,进行ETL转化,写入对应的tablet的过程。 + +其中关键步骤如下: + +#### FE中的处理 +1. 语法和语意处理 + +``` + User Query + + + | mysql protocol + v + +-------+-------+ + | | + | QeService | + | | + +-------+-------+ + | + v + +-------+-------+ + | | + | MysqlServer | + | | + +-------+-------+ + | + v + +---------+---------+ + | | + | ConnectScheduler | + | | + +---------+---------+ + | + v + +---------+---------+ + | | + | ConnectProcessor | + | | + +---------+---------+ + | + v + +-------+-------+ + | | + | StmtExecutor | + | | + +-------+-------+ +``` +上述流程,是一个查询发送到Doris之后,进行语法和语意分析所经过的处理流程。其中,在Doris中,MysqlServer是实现了Mysql Protocol的一个server,用户接收用户的mysql查询请求,经过ConnectScheduler的调度之后,有ConnectProcessor处理,并且最终由StmtExecutor进行语法和语意分析。 + +2. Load job执行 + +``` + +-------+-------+ + | PENDING |-----------------| + +-------+-------+ | + | BrokerLoadPendingTask | + v | + +-------+-------+ | + | LOADING |-----------------| + +-------+-------+ | + | LoadLodingTask | + v | + +-------+-------+ | + | COMMITTED |-----------------| + +-------+-------+ | + | | + v v + +-------+-------+ +-------+-------+ + | FINISHED | | CANCELLED | + +-------+-------+ +-------+-------+ + | Λ + |-------------------------| +``` + +用户发起的Broker导入的请求,最终在StmtExecutor经过语法和语意分析之后,会生成LoadStmt,然后在DdlExecutor中,会根据LoadStmt生成BrokerLoadJob。 + +```cpp + if (ddlStmt instanceof LoadStmt) { + LoadStmt loadStmt = (LoadStmt) ddlStmt; + EtlJobType jobType; + if (loadStmt.getBrokerDesc() != null) { + jobType = EtlJobType.BROKER; + } else { + if (Config.disable_hadoop_load) { + throw new DdlException("Load job by hadoop cluster is disabled." + + " Try using broker load. See 'help broker load;'"); + } + jobType = EtlJobType.HADOOP; + } + if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) { + catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); + } else { + catalog.getLoadManager().createLoadJobFromStmt(loadStmt, origStmt); + } + } +``` + +BrokerLoadJob的执行,是采用状态机的方式执行的。作业的初始状态为PENDING,在PENDING状态的时候,会做两件事情: + +1. 调用全局事务管理器,begin transaction,申请一个txn id。 +2. 创建一个BrokerLoadPendingTask,该任务的主要作用就是查询用户传的文件路径是否正确,以及文件的个数和大小,将该信息按照partition的方式进行组织,存储在一个Map中(参考数据结构:BrokerPendingTaskAttachment)。 +关键代码如下(BrokerLoadPendingTask.java): + +``` +private void getAllFileStatus() throws UserException { + long start = System.currentTimeMillis(); + long totalFileSize = 0; + int totalFileNum = 0; + for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : aggKeyToBrokerFileGroups.entrySet()) { + FileGroupAggKey aggKey = entry.getKey(); + List<BrokerFileGroup> fileGroups = entry.getValue(); + + List<List<TBrokerFileStatus>> fileStatusList = Lists.newArrayList(); + long tableTotalFileSize = 0; + int tableTotalFileNum = 0; + int groupNum = 0; + for (BrokerFileGroup fileGroup : fileGroups) { + long groupFileSize = 0; + List<TBrokerFileStatus> fileStatuses = Lists.newArrayList(); + for (String path : fileGroup.getFilePaths()) { + BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + } + fileStatusList.add(fileStatuses); + for (TBrokerFileStatus fstatus : fileStatuses) { + groupFileSize += fstatus.getSize(); + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("file_status", fstatus).build()); + } + } + tableTotalFileSize += groupFileSize; + tableTotalFileNum += fileStatuses.size(); + LOG.info("get {} files in file group {} for table {}. size: {}. job: {}", + fileStatuses.size(), groupNum, entry.getKey(), groupFileSize, callback.getCallbackId()); + groupNum++; + } + + totalFileSize += tableTotalFileSize; + totalFileNum += tableTotalFileNum; + ((BrokerPendingTaskAttachment) attachment).addFileStatus(aggKey, fileStatusList); + LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}", + tableTotalFileNum, tableTotalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId()); + } + + ((BrokerLoadJob) callback).setLoadFileInfo(totalFileNum, totalFileSize); + } +``` + +其中,查询用户文件路径的代码就是:`BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);` + +在BrokerLoadPedingTask执行完成之后,BrokerLoadJob的状态会变成LOADING状态,在LOADING阶段,主要做的事情就是依据导入指定的partition个数,每个partition创建一个LoadLoadingTask。该任务最主要的事情就是生成导入查询计划,并且分发到后端BE节点进行执行。 + +##### LoadLoadingTask + +LoadLoadingTask是导入过程中**最重要的步骤**,如前所述,它会负责导入计划的生成和执行的分发和执行(依赖查询计划的执行框架Coordinator),生成的查询计划(由LoadingTaskPlanner来负责生成导入计划)如下所示: + +``` +Fragment: + +------------------------------+ + | +----------+---------+ | + | | BrokerScanNode | | + | +----------+---------+ | + | | | + | v | + | +----------+---------+ | + | | OlapTableSink | | + | +---------+----------+ | + +------------------------------+ +``` + +BrokerScanNode中会完成以下工作: + +- 实现列 + + 将源文件的列映射到表中的列,并且支持列的表达式运算(Set功能) +- 实现负导入 + + 针对sum类型的value列,支持导入负数据 +- 实现条件过滤 +- column from path机制 + + 从文件路径中提取列的值 + +主要的逻辑如下: Review comment: 下面的代码逻辑和Broker scan node好像没对上? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
