TyrantLucifer commented on code in PR #2800: URL: https://github.com/apache/incubator-seatunnel/pull/2800#discussion_r979241227
########## seatunnel-connectors-v2/README.md: ########## @@ -1,48 +1,101 @@ # Purpose -This article introduces the new interface and the new code structure on account of the newly designed API for Connectors in Apache SeaTunnel. This helps developers with quick overview regarding API, translation layer improvement, and development of new Connector. + +This article introduces the new interface and the new code structure on account of the newly designed API for Connectors Review Comment: Please link proposal here like the Chinese document. ########## seatunnel-connectors-v2/README.zh.md: ########## @@ -1,64 +1,127 @@ ## 目的 -Because SeaTunnel design new API for connectors, 所以通过这篇文章来介绍新的接口以及新的代码结构,方便开发者快速的帮助新API和翻译层完善,以及开发出新的Connecotor. + +因为SeaTunnel 为connectors设计了新的API,所以通过这篇文章来介绍新的接口以及新的代码结构,方便开发者快速的帮助新API和翻译层完善,以及开发出新的Connector. +详细设计请查看该[提议](https://github.com/apache/incubator-seatunnel/issues/1608) 。 + ## 代码结构 -现阶段所有相关代码保存在`api-draft`分支上。 + 为了和老的代码分开,方便现阶段的并行开发,以及降低merge的难度。我们为新的执行流程定义了新的模块 + ### Example -我们已经在`seatunnel-examples`中准备好了新版本的可本地执行Example程序,直接调用`seatunnel-flink-connector-v2-example`或`seatunnel-spark-connector-v2-example`中的`SeaTunnelApiExample`即可。这也是本地开发Connector经常会用到的调试方式。 -对应的配置文件保存在同模块的`resources/examples`文件夹下,和以前一样。 + +我们已经在`seatunnel-examples` +准备了两个本地可执行的案例程序,其中一个是`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java` +,它运行在flink引擎上。另外一个是`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java` +,它运行在spark引擎上。你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。 + +1. 在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`添加connector依赖的groupId, artifactId 和 + version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖) +2. 如果你的connector存在scope为test或者provided的依赖则在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`( + 或者在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`)修改为compile。 +3. 参考`SeaTunnelApiExample`开发自己的案例程序。 + ### 启动类 -和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`. 可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。 + +和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`. +可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。 + ### SeaTunnel API + 新建了一个`seatunnel-api`(不是`seatunnel-apis`)模块,用于存放SeaTunnel API定义的新接口, 开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel Connector + ### 翻译层 -我们通过适配不同引擎的接口,实现SeaTunnel API和Engine API的转换,从而达到翻译的效果,让我们的SeaTunnel Connector支持多个不同引擎的运行。 -对应代码地址为`seatunnel-translation`,该模块有对应的翻译层实现。感兴趣可以查看代码,帮助我们完善当前代码。 + +我们通过适配不同引擎的接口,实现SeaTunnel API和Engine API的转换,从而达到翻译的效果,让我们的SeaTunnel Connector支持多个不同引擎的运行。 对应代码地址为`seatunnel-translation` +,该模块有对应的翻译层实现。感兴趣可以查看代码,帮助我们完善当前代码。 + ## API 介绍 + `SeaTunnel 当前版本的API设计借鉴了Flink的设计理念` + ### Source + #### SeaTunnelSource.java -- SeaTunnel的Source采用流批一体的设计,通过`getBoundedness`来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。 + +- SeaTunnel的Source采用流批一体的设计,通过`getBoundedness` + 来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。 - `getRowTypeInfo`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者运行用户通过config配置来自定义schema,推荐后者。 - SeaTunnelSource是执行在driver端的类,通过该类,来获取SourceReader,SplitEnumerator等对象以及序列化器。 - 目前SeaTunnelSource支持的生产的数据类型必须是SeaTunnelRow类型。 + #### SourceSplitEnumerator.java + 通过该枚举器来获取数据读取的分片(SourceSplit)情况,不同的分片可能会分配给不同的SourceReader来读取数据。包含几个关键方法: + - `run`用于执行产生SourceSplit并调用`SourceSplitEnumerator.Context.assignSplit`来将分片分发给SourceReader。 - `addSplitsBack`用于处理SourceReader异常导致SourceSplit无法正常处理或者重启时,需要SourceSplitEnumerator对这些Split进行重新分发。 -- `registerReader`处理一些在run运行了之后才注册上的SourceReader,如果这个时候还没有分发下去的SourceSplit,就可以分发给这些新的Reader(对,你大多数时候需要在SourceSplitEnumerator里面维护你的SourceSplit分发情况) -- `handleSplitRequest`如果有些Reader主动向SourceSplitEnumerator请求SourceSplit,那么可以通过该方法调用`SourceSplitEnumerator.Context.assignSplit`来向对应的Reader发送分片。 -- `snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator`来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。 +- `registerReader` + 处理一些在run运行了之后才注册上的SourceReader,如果这个时候还没有分发下去的SourceSplit,就可以分发给这些新的Reader(对,你大多数时候需要在SourceSplitEnumerator里面维护你的SourceSplit分发情况) +- `handleSplitRequest` + 如果有些Reader主动向SourceSplitEnumerator请求SourceSplit,那么可以通过该方法调用`SourceSplitEnumerator.Context.assignSplit`来向对应的Reader发送分片。 +- `snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator` + 来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。 - `notifyCheckpointComplete`用于状态保存成功后的后续处理,可以用于将状态或者标记存入第三方存储。 + #### SourceSplit.java + 用于保存分片的接口,不同的分片需要定义不同的splitId,可以通过实现这个接口,保存分片需要保存的数据,比如kafka的partition和topic,hbase的columnfamily等信息,用于SourceReader来确定应该读取全部数据的哪一部分。 + #### SourceReader.java + 直接和数据源进行交互的接口,通过实现该接口完成从数据源读取数据的动作。 -- `pollNext`便是Reader的核心,通过这个接口,实现读取数据源的数据然后返回给SeaTunnel的流程。每当准备将数据传递给SeaTunnel时,就可以调用参数中的`Collector.collect`方法,可以无限次的调用该方法完成数据的大量读取。但是现阶段支持的数据格式只能是`SeaTunnelRow`。因为我们的Source是流批一体的,所以批模式的时候Connector要自己决定什么时候结束数据读取,比如批处理一次读取100条数据,读取完成后需要在`pollNext`中调用`SourceReader.Context.signalNoMoreElement`通知SeaTunnel没有数据读取了,那么就可以利用这100条数据进行批处理。流处理没有这个要求,那么大多数流批一体的SourceReader都会出现如下代码: + +- `pollNext`便是Reader的核心,通过这个接口,实现读取数据源的数据然后返回给SeaTunnel的流程。每当准备将数据传递给SeaTunnel时,就可以调用参数中的`Collector.collect` + 方法,可以无限次的调用该方法完成数据的大量读取。但是现阶段支持的数据格式只能是`SeaTunnelRow` + 。因为我们的Source是流批一体的,所以批模式的时候Connector要自己决定什么时候结束数据读取,比如批处理一次读取100条数据,读取完成后需要在`pollNext` + 中调用`SourceReader.Context.signalNoMoreElement` + 通知SeaTunnel没有数据读取了,那么就可以利用这100条数据进行批处理。流处理没有这个要求,那么大多数流批一体的SourceReader都会出现如下代码: + ```java -if (Boundedness.BOUNDED.equals(context.getBoundedness())) { +if(Boundedness.BOUNDED.equals(context.getBoundedness())){ Review Comment: if (Boundedness.BOUNDED.equals(context.getBoundedness())) { -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
