This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 94ec710 [ISSUES #129]feat(doc) add design documentation directory
(#130)
94ec710 is described below
commit 94ec7100e66b437c2d8e96b288ceee0d495fd508
Author: Ni Ze <[email protected]>
AuthorDate: Tue Feb 22 17:46:18 2022 +0800
[ISSUES #129]feat(doc) add design documentation directory (#130)
* feat(doc) add design documentation directory
* feat(doc) add design documentation
---
...225\264\344\275\223\346\236\266\346\236\204.md" | 33 ++++++++++
.../2.\346\236\204\345\273\272DataStream.md" | 73 +++++++++++++++++++++
.../3.\345\220\257\345\212\250DataStream.md" | 53 +++++++++++++++
...265\201\350\275\254\350\277\207\347\250\213.md" | 63 ++++++++++++++++++
...256\227\345\255\220\350\247\243\346\236\220.md" | 55 ++++++++++++++++
...256\236\347\216\260\345\256\271\351\224\231.md" | 0
"docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 0 -> 44207 bytes
docs/images/window.png | Bin 0 -> 241692 bytes
...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 0 -> 60493 bytes
...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 0 -> 44252 bytes
.../\346\211\251\345\256\271\345\211\215.png" | Bin 0 -> 56733 bytes
...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 0 -> 35766 bytes
"docs/images/\347\212\266\346\200\201.png" | Bin 0 -> 47527 bytes
"docs/images/\347\274\251\345\256\271.png" | Bin 0 -> 51087 bytes
.../streams/examples/source/FileSourceExample.java | 2 +-
15 files changed, 278 insertions(+), 1 deletion(-)
diff --git
"a/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
"b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
new file mode 100644
index 0000000..8e564a8
--- /dev/null
+++
"b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md"
@@ -0,0 +1,33 @@
+### 总体架构
+
+
+
+数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。
+如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
+shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。
+
+
+### 任务并行度模型
+
+
+
+计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配,
+计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。
+
+一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。
+
+### 状态
+
+
+对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储
+RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。
+
+
+
+### 扩缩容
+
+
+
+当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。
+Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2
+和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。
diff --git "a/docs/design/2.\346\236\204\345\273\272DataStream.md"
"b/docs/design/2.\346\236\204\345\273\272DataStream.md"
new file mode 100644
index 0000000..af12e27
--- /dev/null
+++ "b/docs/design/2.\346\236\204\345\273\272DataStream.md"
@@ -0,0 +1,73 @@
+DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传,
+将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。
+
+### source类型
+ - 设置source的namespace、configureName;
+ - 将source保存到PipelineBuilder中;
+ - 将source作为source节点保存到PipelineBuilder中的ChainPipeline中;
+
+### ChainStage类型
+
+所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入
+PipelineBuilder,参加后续构建。
+
+在DataStream中一个典型的添加新算子,过程如下所示:
+```java
+
+public DataStream script(String script) {
+ //将用户定义的cript转化成ChainStage
+ // ChainStage<?> stage = this.mainPipelineBuilder.createStage(new
ScriptOperator(script));
+ //将ChainStage添加到PipelineBuilder中,构建拓扑。
+ this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
+ //将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑
+ return new DataStream(this.mainPipelineBuilder,
this.otherPipelineBuilders, stage);
+}
+
+```
+
+### 创建ChainStage
+
+PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中
+
+- 把ChainStage添加到pipeline中
+ 在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有
+ 一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系,
+ 以及用于寻找下个stage的label。
+ 
+在createStage过程中,将chainStage加入到Pipeline中。
+
+在setTopologyStages 过程中将label加入到Pipeline中;
+
+### 设置拓扑
+```java
+public void setTopologyStages(ChainStage currentChainStage, List<ChainStage>
nextStages) {
+ if (isBreak) {
+ return;
+ }
+ if (nextStages == null) {
+ return;
+ }
+ List<String> lableNames = new ArrayList<>();
+ for (ChainStage stage : nextStages) {
+ lableNames.add(stage.getLabel());
+ }
+
+ if (currentChainStage == null) {
+ this.pipeline.setChannelNextStageLabel(lableNames);
+ } else {
+ currentChainStage.setNextStageLabels(lableNames);
+ for (ChainStage stage : nextStages) {
+ stage.getPrevStageLabels().add(currentChainStage.getLabel());
+ }
+ }
+ }
+```
+
+如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。
+同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。
+
+
+
+
+
+
diff --git "a/docs/design/3.\345\220\257\345\212\250DataStream.md"
"b/docs/design/3.\345\220\257\345\212\250DataStream.md"
new file mode 100644
index 0000000..91f91f2
--- /dev/null
+++ "b/docs/design/3.\345\220\257\345\212\250DataStream.md"
@@ -0,0 +1,53 @@
+### Start流程
+
+流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑
+图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。
+
+### 统一管理点
+
+- 加载统一管理点IConfigurableService;
+
+ 三种方式存储:Memory, db, file
+
+-
PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable,
+ 保存到IConfigurableService中;
+
+- IConfigurableService的refreshConfigurable方法;
+
+ 1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。
+
+ 2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage,
+ 那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。
+
+ 3.从IConfigurable中加载实例副本出来;
+
+ 4.将实例副本赋值;
+
+ 5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource
+ 的时候,就会在此时调用init方法,先于启动方法调用;
+
+ 6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用,
+ (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用
+ ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法;
+
+ 7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的
+
doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。
+
+ 8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的
+ doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值;
+
+ 9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段;
+
+
+### ChainPipeline的启动
+```java
+pipeline.startChannel();
+```
+
+将ChainPipeline作为整个数据接收的入口,并启动source;
+
+当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法;
+
+该方法将数据封装承AbstractContext后,向后传递;
+
+
diff --git
"a/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
"b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
new file mode 100644
index 0000000..1eb791c
--- /dev/null
+++
"b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md"
@@ -0,0 +1,63 @@
+###总体过程
+
+
+
+数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过
+AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用
+深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑,
+如果不是,则用stage来处理具体数据。
+
+### 无window算子执行流程
+- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource;
+- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系
+ 统消息处理完成返回后,才能继续处理该数据。
+- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用
+stage处理数据;
+- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行;
+- 遍历stage直到结束。
+
+### 含有window算子执行流程
+
+
+
+- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入
+ ChainPipeline按照已经构建好的拓扑图执行。
+- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。
+分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的
+ MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时,
+ groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。
+
+- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。
+- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。
+- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状
+ 态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。
+
+- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。
+
+
+### 系统消息
+
+#### NewSplitMessage
+当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。
+
+作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用
+到对应的状态,得出正确的结果。
+
+#### CheckPointMessage
+
+##### 产生时机:
+- 消费分片移除时;
+- RocketMQ-streams向broker提交消费offset时;
+- 处理完一批次消息后;
+
+##### 作用
+- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。
+- 作用于sink,将sink中缓存而未写出的数据写出;
+- 将有状态算子的状态flush到存储;
+
+#### RemoveSplitMessage
+比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出
+RemoveSplitMessage类型消息。
+
+作用于window算子,将RocksDB中状态清除;
+
diff --git
"a/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
"b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
new file mode 100644
index 0000000..4e8be74
--- /dev/null
+++ "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md"
@@ -0,0 +1,55 @@
+### window算子初始化
+window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动
+DataStream阶段完成window的初始化。
+
+
+
+- 给window初始化WindowStorage用户状态存储;
+
+ WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB,
+ remoteStorage使用mysql;
+
+- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据;
+- 向window添加SQLCache,作为写入Mysql之前的缓存;
+- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道;
+
+
+### ShuffleChannel写出shuffle数据
+AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel
+```java
+public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext
context) {
+ shuffleChannel.startChannel();
+ return super.doMessage(message, context);
+}
+```
+
+- shuffleChannel.startChannel
+启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由
+ shuffleChannel的doMessage处理。
+
+- AbstractWindow.doMessage方法
+
+对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark,
+数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到
+shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。
+
+### ShuffleChannel接收到shuffle数据
+ShuffleChannel#doMessage方法;
+
+将shuffle消息加入到shuffleCache中
+
+最终进入ShuffleCache#batchInsert中
+
+WindowOperator#shuffleCalculate中
+
+实际窗口计算:WindowValue#calculate
+
+计算后并不会马上触发窗口,窗口需要定时出发
+
+### window触发
+
WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance
+WindowOperator#fireWindowInstance
+
+windowFireSource.executeMessage
+
+windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点
\ No newline at end of file
diff --git
"a/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md"
"b/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md"
new file mode 100644
index 0000000..e69de29
diff --git "a/docs/images/Pipeline\347\261\273\345\233\276.png"
"b/docs/images/Pipeline\347\261\273\345\233\276.png"
new file mode 100644
index 0000000..dafe81a
Binary files /dev/null and "b/docs/images/Pipeline\347\261\273\345\233\276.png"
differ
diff --git a/docs/images/window.png b/docs/images/window.png
new file mode 100644
index 0000000..30ba894
Binary files /dev/null and b/docs/images/window.png differ
diff --git
"a/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
"b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
new file mode 100644
index 0000000..5eba9ce
Binary files /dev/null and
"b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png"
differ
diff --git "a/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png"
"b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png"
new file mode 100644
index 0000000..7a68947
Binary files /dev/null and
"b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" differ
diff --git "a/docs/images/\346\211\251\345\256\271\345\211\215.png"
"b/docs/images/\346\211\251\345\256\271\345\211\215.png"
new file mode 100644
index 0000000..5232b76
Binary files /dev/null and
"b/docs/images/\346\211\251\345\256\271\345\211\215.png" differ
diff --git
"a/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png"
"b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png"
new file mode 100644
index 0000000..a9ec479
Binary files /dev/null and
"b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png"
differ
diff --git "a/docs/images/\347\212\266\346\200\201.png"
"b/docs/images/\347\212\266\346\200\201.png"
new file mode 100644
index 0000000..e2fd9b2
Binary files /dev/null and "b/docs/images/\347\212\266\346\200\201.png" differ
diff --git "a/docs/images/\347\274\251\345\256\271.png"
"b/docs/images/\347\274\251\345\256\271.png"
new file mode 100644
index 0000000..05dcee4
Binary files /dev/null and "b/docs/images/\347\274\251\345\256\271.png" differ
diff --git
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java
b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java
index ecaf0bc..b51ea2e 100644
---
a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java
+++
b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/FileSourceExample.java
@@ -22,7 +22,7 @@ import
org.apache.rocketmq.streams.client.source.DataStreamSource;
public class FileSourceExample {
public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace",
"pipeline");
- source.fromFile("/Users/junjie.cheng/jobs/access.log", false)
+ source.fromFile("data.txt", false)
.map(message -> message)
.toPrint(1)
.start();