hailin0 commented on code in PR #8375:
URL: https://github.com/apache/seatunnel/pull/8375#discussion_r1896736716
##########
seatunnel-connectors-v2/README.md:
##########
@@ -16,216 +16,248 @@ development at the current stage, and reduces the
difficulty of merging.
- ../`seatunnel-translation`
translation layer for the connector-v2
- ../`seatunnel-transform-v2`
transform v2 connector implementation
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e`
connector v2 e2e code
-- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use flink local running instance
-- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use spark local running instance
+- ../seatunnel-examples/`seatunnel-engine-examples`
seatunnel connector-v2 example use Zeta local running instance
+- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use Flink local running instance
+- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use Spark local running instance
-### **Example**
+### Example
-We have prepared two new version of the locally executable example program in
`seatunnel-examples`,one
-is
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-, it runs in the Flink engine. Another one
-is
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-, it runs in the Spark engine. This is also the debugging method that is often
used in the local development of
-Connector. You can debug these examples, which will help you better understand
the running logic of the program. The
-configuration files used in example are saved in the "resources/examples"
folder. If you want to add examples for your
-own connectors, you need to follow the steps below.
+We have prepared three locally executable example programs in
`seatunnel-examples`:
+-
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,
which runs on the Zeta engine
+-
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,
which runs on the Flink engine
+-
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,
which runs on the Spark engine
-1. Add the groupId, artifactId and version of the connector to be tested to
- `seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it
to
- `seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you
want to runs it in Spark engine) as a
- dependency.
-2. Find the dependency in your connector pom file which scope is test or
provided and then add them to
- seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
- seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and
modify the scope to compile.
-3. Add the task configuration file under resources/examples.
-4. Configure the file in the `SeaTunnelApiExample` main method.
-5. Just run the main method.
+You can debug these examples to help you better understand the running logic
of the program. The configuration files used are saved in the
`resources/examples` folder.
+If you want to add your own connectors, you need to follow the steps below.
-### **Create new seatunnel v2 connector**
+To add a new connector to the example using the Zeta engine, follow these
steps:
+1. Add the connector dependency's `groupId`, `artifactId`, and `version` to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` (or to
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml` or
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` if you want
to run it on the Flink or Spark engine, respectively).
+2. If there are dependencies in your connector with `scope` set to `test` or
`provided`, add these dependencies to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` and change the `scope`
to `compile`.
+3. Add the task configuration file under `resources/examples`.
+4. Configure the file path in the `SeaTunnelEngineLocalExample.java` main
method.
+5. Run the main method.
-1.Create a new module under the `seatunnel-connectors-v2` directory and name
it connector - {connector name}.
+### Create New Seatunnel V2 Connector
-2.The pom file can refer to the pom file of the existing connector, and add
the current sub model to the pom file of the parent model
+1. Create a new module under the `seatunnel-connectors-v2` directory and name
it connector-{ConnectorName}.
+2. The `pom.xml` file can refer to the `pom.xml` file of the existing
connector, and add the current sub-module to `seatunnel-connectors-v2/pom.xml`.
+3. Create two packages corresponding to source and sink
-3.Create two packages corresponding to source and sink
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.source
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.sink
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
+4. add connector info to plugin-mapping.properties file in seatunnel root path.
-4.add connector info to plugin-mapping.properties file in seatunnel root path.
+5. add connector dependency to seatunnel-dist/pom.xml, so the connector jar
can be find in binary package.
-5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can
be find in binary package.
+6. There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
-6.There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
+7. {ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the `@AutoService (Factory.class)` annotation on the class name,
and in addition to the required methods, source side an additional
`creatSource` method needs to be rewritten and sink side an additional
`creatSink` method needs to be rewritten
-7.{ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the **@AutoService (Factory.class)** annotation on the class
name, and in addition to the required methods, source side an additional
**creatSource** method needs to be rewritten and sink side an additional
**creatSink** method needs to be rewritten
+8. {ConnectorName}Source needs to override the `getProducedCatalogTables`
method; {ConnectorName}Sink needs to override the `getWriteCatalogTable` method
-8.{ConnectorName}Source needs to override the **getProducedCatalogTables**
method; {ConnectorName}Sink needs to override the **getWriteCatalogTable**
method
+### Startup Class
-### **Startup Class**
+We have created three starter projects: `seatunnel-core/seatunnel-starter`,
`seatunnel-core/seatunnel-flink-starter`, and
`seatunnel-core/seatunnel-spark-starter`.
+Here you can find how to parse configuration files into executable
Zeta/Flink/Spark processes.
-Aside from the old startup class, we have created two new startup modules,
-namely ``seatunnel-core/seatunnel-flink-starter`` and
``seatunnel-core/seatunnel-spark-starter``. You can find out how
-to parse the configuration file into an executable Flink/Spark process here.
+### SeaTunnel API
-### **SeaTunnel API**
+The `seatunnel-api` module is used to store the new interfaces defined by the
SeaTunnel API. By implementing these interfaces, developers can create
SeaTunnel Connectors that support multiple engines.
-A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to
store the new interfaces defined by the
-SeaTunnel API. By implementing these interfaces, developers can complete the
SeaTunnel Connector that supports multiple
-engines.
-
-### **Translation Layer**
+### Translation Layer
We realize the conversion between SeaTunnel API and Engine API by adapting the
interfaces of different engines, so as to
achieve the effect of translation, and let our SeaTunnel Connector support the
operation of multiple different engines.
-The corresponding code address, ``seatunnel-translation``, this module has the
corresponding translation layer
+The corresponding code address, `seatunnel-translation`, this module has the
corresponding translation layer
implementation. If you are interested, you can view the code and help us
improve the current code.
-## **API introduction**
+## API introduction
The API design of the current version of SeaTunnel draws on the design concept
of Flink.
-### **Source**
+### Source
+
+#### TableSourceFactory.java
+
+- Used to create a factory class for Source, through which Source instances
are created using the `createSource` method.
+- `factoryIdentifier` is used to identify the name of the current Factory,
which is also configured in the configuration file to distinguish different
connectors.
+- `OptionRule` is used to define the parameters supported by the current
connector. This method can be used to define the logic of the parameters, such
as which parameters are required, which are optional, which are mutually
exclusive, etc.
Review Comment:
```suggestion
- `optionRule` is used to define the parameters supported by the current
connector. This method can be used to define the logic of the parameters, such
as which parameters are required, which are optional, which are mutually
exclusive, etc.
```
##########
seatunnel-connectors-v2/README.zh.md:
##########
@@ -124,57 +148,91 @@ if (Boundedness.BOUNDED.equals(context.getBoundedness()))
{
- `snapshotState`用于流处理定时返回需要保存的当前状态,也就是分片信息(SeaTunnel将分片信息和状态保存在一起,实现动态分配)。
-
`notifyCheckpointComplete`和`notifyCheckpointAborted`和名字一样,是checkpoint不同状态下的回调。
+##### SourceReader.Context
+
+Context是SourceReader的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `getIndexOfSubtask`用于获取当前Reader的subTask索引。
+- `getBoundedness`用于获取当前Reader的Boundedness,是流还是批。
+- `signalNoMoreElement`用于通知SeaTunnel没有数据读取了。
+- `sendSplitRequest`用于向SourceSplitEnumerator请求分片,用于在Reader没有分片的时候主动请求分片。
+- `sendSourceEventToEnumerator`用于发送事件给SourceSplitEnumerator。
+- `getMetricsContext`用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener`用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
+
### Sink
+#### TableSinkFactory.java
+
+- 用于创建Sink的工厂类,通过该类来创建Sink实例,通过`createSink`方法来创建Sink实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+-
OptionRule用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
Review Comment:
```suggestion
- `optionRule`
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
```
##########
seatunnel-connectors-v2/README.zh.md:
##########
@@ -72,18 +74,26 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
### Source
+#### TableSourceFactory.java
+
+- 用于创建Source的工厂类,通过该类来创建Source实例,通过`createSource`方法来创建Source实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+-
OptionRule用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
Review Comment:
```suggestion
- `optionRule`
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
```
##########
seatunnel-connectors-v2/README.md:
##########
@@ -16,216 +16,248 @@ development at the current stage, and reduces the
difficulty of merging.
- ../`seatunnel-translation`
translation layer for the connector-v2
- ../`seatunnel-transform-v2`
transform v2 connector implementation
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e`
connector v2 e2e code
-- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use flink local running instance
-- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use spark local running instance
+- ../seatunnel-examples/`seatunnel-engine-examples`
seatunnel connector-v2 example use Zeta local running instance
+- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use Flink local running instance
+- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use Spark local running instance
-### **Example**
+### Example
-We have prepared two new version of the locally executable example program in
`seatunnel-examples`,one
-is
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-, it runs in the Flink engine. Another one
-is
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-, it runs in the Spark engine. This is also the debugging method that is often
used in the local development of
-Connector. You can debug these examples, which will help you better understand
the running logic of the program. The
-configuration files used in example are saved in the "resources/examples"
folder. If you want to add examples for your
-own connectors, you need to follow the steps below.
+We have prepared three locally executable example programs in
`seatunnel-examples`:
+-
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,
which runs on the Zeta engine
+-
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,
which runs on the Flink engine
+-
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,
which runs on the Spark engine
-1. Add the groupId, artifactId and version of the connector to be tested to
- `seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it
to
- `seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you
want to runs it in Spark engine) as a
- dependency.
-2. Find the dependency in your connector pom file which scope is test or
provided and then add them to
- seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
- seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and
modify the scope to compile.
-3. Add the task configuration file under resources/examples.
-4. Configure the file in the `SeaTunnelApiExample` main method.
-5. Just run the main method.
+You can debug these examples to help you better understand the running logic
of the program. The configuration files used are saved in the
`resources/examples` folder.
+If you want to add your own connectors, you need to follow the steps below.
-### **Create new seatunnel v2 connector**
+To add a new connector to the example using the Zeta engine, follow these
steps:
+1. Add the connector dependency's `groupId`, `artifactId`, and `version` to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` (or to
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml` or
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` if you want
to run it on the Flink or Spark engine, respectively).
+2. If there are dependencies in your connector with `scope` set to `test` or
`provided`, add these dependencies to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` and change the `scope`
to `compile`.
+3. Add the task configuration file under `resources/examples`.
+4. Configure the file path in the `SeaTunnelEngineLocalExample.java` main
method.
+5. Run the main method.
-1.Create a new module under the `seatunnel-connectors-v2` directory and name
it connector - {connector name}.
+### Create New Seatunnel V2 Connector
-2.The pom file can refer to the pom file of the existing connector, and add
the current sub model to the pom file of the parent model
+1. Create a new module under the `seatunnel-connectors-v2` directory and name
it connector-{ConnectorName}.
+2. The `pom.xml` file can refer to the `pom.xml` file of the existing
connector, and add the current sub-module to `seatunnel-connectors-v2/pom.xml`.
+3. Create two packages corresponding to source and sink
-3.Create two packages corresponding to source and sink
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.source
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.sink
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
+4. add connector info to plugin-mapping.properties file in seatunnel root path.
-4.add connector info to plugin-mapping.properties file in seatunnel root path.
+5. add connector dependency to seatunnel-dist/pom.xml, so the connector jar
can be find in binary package.
-5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can
be find in binary package.
+6. There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
-6.There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
+7. {ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the `@AutoService (Factory.class)` annotation on the class name,
and in addition to the required methods, source side an additional
`creatSource` method needs to be rewritten and sink side an additional
`creatSink` method needs to be rewritten
-7.{ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the **@AutoService (Factory.class)** annotation on the class
name, and in addition to the required methods, source side an additional
**creatSource** method needs to be rewritten and sink side an additional
**creatSink** method needs to be rewritten
+8. {ConnectorName}Source needs to override the `getProducedCatalogTables`
method; {ConnectorName}Sink needs to override the `getWriteCatalogTable` method
-8.{ConnectorName}Source needs to override the **getProducedCatalogTables**
method; {ConnectorName}Sink needs to override the **getWriteCatalogTable**
method
+### Startup Class
-### **Startup Class**
+We have created three starter projects: `seatunnel-core/seatunnel-starter`,
`seatunnel-core/seatunnel-flink-starter`, and
`seatunnel-core/seatunnel-spark-starter`.
+Here you can find how to parse configuration files into executable
Zeta/Flink/Spark processes.
-Aside from the old startup class, we have created two new startup modules,
-namely ``seatunnel-core/seatunnel-flink-starter`` and
``seatunnel-core/seatunnel-spark-starter``. You can find out how
-to parse the configuration file into an executable Flink/Spark process here.
+### SeaTunnel API
-### **SeaTunnel API**
+The `seatunnel-api` module is used to store the new interfaces defined by the
SeaTunnel API. By implementing these interfaces, developers can create
SeaTunnel Connectors that support multiple engines.
-A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to
store the new interfaces defined by the
-SeaTunnel API. By implementing these interfaces, developers can complete the
SeaTunnel Connector that supports multiple
-engines.
-
-### **Translation Layer**
+### Translation Layer
We realize the conversion between SeaTunnel API and Engine API by adapting the
interfaces of different engines, so as to
achieve the effect of translation, and let our SeaTunnel Connector support the
operation of multiple different engines.
-The corresponding code address, ``seatunnel-translation``, this module has the
corresponding translation layer
+The corresponding code address, `seatunnel-translation`, this module has the
corresponding translation layer
implementation. If you are interested, you can view the code and help us
improve the current code.
-## **API introduction**
+## API introduction
The API design of the current version of SeaTunnel draws on the design concept
of Flink.
-### **Source**
+### Source
+
+#### TableSourceFactory.java
+
+- Used to create a factory class for Source, through which Source instances
are created using the `createSource` method.
+- `factoryIdentifier` is used to identify the name of the current Factory,
which is also configured in the configuration file to distinguish different
connectors.
+- `OptionRule` is used to define the parameters supported by the current
connector. This method can be used to define the logic of the parameters, such
as which parameters are required, which are optional, which are mutually
exclusive, etc.
+ SeaTunnel will use `OptionRule` to verify the validity of the user's
configuration. Please refer to the `Option` below.
+- Make sure to add the `@AutoService(Factory.class)` annotation to
`TableSourceFactory`.
-#### **SeaTunnelSource.java**
+#### SeaTunnelSource.java
-- The Source of SeaTunnel adopts the design of stream-batch integration,
``getBoundedness`` which determines whether the
+- The Source of SeaTunnel adopts the design of stream-batch integration,
`getBoundedness` which determines whether the
current Source is a stream Source or a batch Source, so you can specify a
Source by dynamic configuration (refer to
the default method), which can be either a stream or a batch.
-- ``getRowTypeInfo`` To get the schema of the data, the connector can choose
to hard-code to implement a fixed schema,
- or run the user to customize the schema through config configuration. The
latter is recommended.
+- `getProducedCatalogTables` is used to get the schema of the data. The
connector can choose to hard-code to implement a fixed schema or implement a
custom schema through user-defined configuration.
+ The latter is recommended.
- SeaTunnelSource is a class executed on the driver side, through which
objects such as SourceReader, SplitEnumerator
and serializers are obtained.
- Currently, the data type supported by SeaTunnelSource must be SeaTunnelRow.
-#### **SourceSplitEnumerator.java**
+#### SourceSplitEnumerator.java
Use this enumerator to get the data read shard (SourceSplit) situation,
different shards may be assigned to different
SourceReaders to read data. Contains several key methods:
-- ``run``: Used to perform a spawn SourceSplit and call
``SourceSplitEnumerator.Context.assignSplit``: to distribute the
+- The `open` method is used to initialize the SourceSplitEnumerator. In this
method, you can initialize resources such as database connections or states.
+- `run`: Used to perform a spawn SourceSplit and call
`SourceSplitEnumerator.Context.assignSplit`: to distribute the
shards to the SourceReader.
-- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these
Splits when SourceSplit cannot be processed
+- `addSplitsBackSourceSplitEnumerator`: is required to redistribute these
Splits when SourceSplit cannot be processed
normally or restarted due to the exception of SourceReader.
-- ``registerReaderProcess``: some SourceReaders that are registered after the
run is run. If there is no SourceSplit
+- `registerReaderProcess`: some SourceReaders that are registered after the
run is run. If there is no SourceSplit
distributed at this time, it can be distributed to these new readers (yes,
you need to maintain your SourceSplit
distribution in SourceSplitEnumerator most of the time).
-- ``handleSplitRequest``: If some Readers actively request SourceSplit from
SourceSplitEnumerator, this method can be
+- `handleSplitRequest`: If some Readers actively request SourceSplit from
SourceSplitEnumerator, this method can be
called SourceSplitEnumerator.Context.assignSplit to sends shards to the
corresponding Reader.
-- ``snapshotState``: It is used for stream processing to periodically return
the current state that needs to be saved.
+- `snapshotState`: It is used for stream processing to periodically return the
current state that needs to be saved.
If there is a state restoration, it will be called
SeaTunnelSource.restoreEnumerator to constructs a
SourceSplitEnumerator and restore the saved state to the
SourceSplitEnumerator.
-- ``notifyCheckpointComplete``: It is used for subsequent processing after the
state is successfully saved, and can be
+- `notifyCheckpointComplete`: It is used for subsequent processing after the
state is successfully saved, and can be
used to store the state or mark in third-party storage.
+- `handleSourceEvent` is used to handle events from the `SourceReader`. You
can customize events, such as changes in the state of the `SourceReader`.
+- `close` is used to close the `SourceSplitEnumerator` and release resources.
+
+#### SourceSplitEnumerator.Context
+
+The `SourceSplitEnumerator.Context` is the context for the
`SourceSplitEnumerator`, which interacts with SeaTunnel. It includes several
key methods:
-#### **SourceSplit.java**
+- `currentParallelism`: Used to get the current task's parallelism.
+- `registeredReaders`: Used to get the list of currently registered
`SourceReader`.
+- `assignSplit`: Used to assign splits to `SourceReader`.
+- `signalNoMoreSplits`: Used to notify a `SourceReader` that there are no more
splits.
+- `sendEventToSourceReader`: Used to send events to `SourceReader`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for
sending events to SeaTunnel.
+
+#### SourceSplit.java
The interface used to save shards. Different shards need to define different
splitIds. You can implement this interface
to save the data that shards need to save, such as kafka's partition and
topic, hbase's columnfamily and other
information, which are used by SourceReader to determine Which part of the
total data should be read.
-#### **SourceReader.java**
+#### SourceReader.java
The interface that directly interacts with the data source, and the action of
reading data from the data source is
completed by implementing this interface.
-- ``pollNext``: It is the core of Reader. Through this interface, the process
of reading the data of the data source and
+- `pollNext`: It is the core of Reader. Through this interface, the process of
reading the data of the data source and
returning it to SeaTunnel is realized. Whenever you are ready to pass data
to SeaTunnel, you can call
- the ``Collector.collect`` method in the parameter, which can be called an
infinite number of times to complete a large
- amount of data reading. But the data format supported at this stage can only
be ``SeaTunnelRow``. Because our Source
+ the `Collector.collect` method in the parameter, which can be called an
infinite number of times to complete a large
+ amount of data reading. But the data format supported at this stage can only
be `SeaTunnelRow`. Because our Source
is a stream-batch integration, the Connector has to decide when to end data
reading in batch mode. For example, a
- batch reads 100 pieces of data at a time. After the reading is completed, it
needs ``pollNext`` to call in
- to ``SourceReader.Context.signalNoMoreElementnotify`` SeaTunnel that there
is no data to read . , then you can use
+ batch reads 100 pieces of data at a time. After the reading is completed, it
needs `pollNext` to call in
+ to `SourceReader.Context.signalNoMoreElementnotify` SeaTunnel that there is
no data to read . , then you can use
these 100 pieces of data for batch processing. Stream processing does not
have this requirement, so most SourceReaders
with integrated stream batches will have the following code:
-```java
+``java
if(Boundedness.BOUNDED.equals(context.getBoundedness())){
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
break;
}
-```
+``
It means that SeaTunnel will be notified only in batch mode.
-- ``addSplits``: Used by the framework to assign SourceSplit to different
SourceReaders, SourceReader should save the
+- `addSplits`: Used by the framework to assign SourceSplit to different
SourceReaders, SourceReader should save the
obtained shards, and then pollNextread the corresponding shard data in it,
but there may be times when the Reader does
not read shards (maybe SourceSplit has not been generated or The current
Reader is indeed not allocated), at this
time, pollNextcorresponding processing should be made, such as continuing to
wait.
-- ``handleNoMoreSplits``: When triggered, it indicates that there are no more
shards, and the Connector Source is
+- `handleNoMoreSplits`: When triggered, it indicates that there are no more
shards, and the Connector Source is
required to optionally make corresponding feedback
-- ``snapshotStateIt``: is used for stream processing to periodically return
the current state that needs to be saved,
+- `snapshotStateIt`: is used for stream processing to periodically return the
current state that needs to be saved,
that is, the fragmentation information (SeaTunnel saves the fragmentation
information and state together to achieve
dynamic allocation).
-- ``notifyCheckpointComplete``: Like ``notifyCheckpointAborted`` the name, it
is a callback for different states of
+- `notifyCheckpointComplete`: Like `notifyCheckpointAborted` the name, it is a
callback for different states of
checkpoint.
-### **Sink**
+#### SourceReader.Context
+
+The `SourceReader.Context` is the context for the `SourceReader`, which
interacts with SeaTunnel. It includes several key methods:
+
+- `getIndexOfSubtask`: Used to get the current Reader's subTask index.
+- `getBoundedness`: Used to get the current Reader's Boundedness, whether it
is stream or batch.
+- `signalNoMoreElement`: Used to notify SeaTunnel that there are no more
elements to read.
+- `sendSplitRequest`: Used to request splits from the `SourceSplitEnumerator`
when the Reader has no splits.
+- `sendSourceEventToEnumerator`: Used to send events to the
`SourceSplitEnumerator`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for
sending events to SeaTunnel.
+
+### Sink
-#### **SeaTunnelSink.java**
+#### TableSinkFactory.java
-It is used to define the way to write data to the destination, and obtain
instances such as ``SinkWriter``
-and ``SinkCommitter`` through this interface. An important feature of the sink
side is the processing of distributed
-transactions. SeaTunnel defines two different Committers: ``SinkCommitter``
used to process transactions for different
-subTasks ``SinkAggregatedCommitter``. Process transaction results for all
nodes. Different Connector Sinks can be
-selected according to component properties, whether to implement only
``SinkCommitter`` or ``SinkAggregatedCommitter``,
+- Used to create a factory class for the Sink, through which Sink instances
are created using the `createSink` method.
+- `factoryIdentifier` is used to identify the name of the current Factory,
which is also configured in the configuration file to distinguish different
connectors.
+- `OptionRule` is used to define the parameters supported by the current
connector. You can use this method to define the logic of the parameters, such
as which parameters are required, which parameters are optional, which
parameters are mutually exclusive, etc. SeaTunnel will use `OptionRule` to
verify the validity of the user's configuration. Please refer to the Option
below.
Review Comment:
```suggestion
- `optionRule` is used to define the parameters supported by the current
connector. You can use this method to define the logic of the parameters, such
as which parameters are required, which parameters are optional, which
parameters are mutually exclusive, etc. SeaTunnel will use `OptionRule` to
verify the validity of the user's configuration. Please refer to the Option
below.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]