This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37612d9ed2 [Improve][Doc] Update connector v2 contribute guide (#8375)
37612d9ed2 is described below
commit 37612d9ed241a0a0a45de6c3c73ee685b27615be
Author: Jia Fan <[email protected]>
AuthorDate: Fri Dec 27 23:27:34 2024 +0800
[Improve][Doc] Update connector v2 contribute guide (#8375)
Co-authored-by: hailin0 <[email protected]>
---
seatunnel-connectors-v2/README.md | 271 ++++++++++++++++++++---------------
seatunnel-connectors-v2/README.zh.md | 122 +++++++++++-----
2 files changed, 245 insertions(+), 148 deletions(-)
diff --git a/seatunnel-connectors-v2/README.md
b/seatunnel-connectors-v2/README.md
index e22e804cd9..c26924dcfa 100644
--- a/seatunnel-connectors-v2/README.md
+++ b/seatunnel-connectors-v2/README.md
@@ -5,7 +5,7 @@ in Apache SeaTunnel. This helps developers quickly understand
API and transforma
hand, it can guide contributors how to use the new API to develop new
connectors.See
this [issue](https://github.com/apache/seatunnel/issues/1608) for details.
-## **Code Structure**
+## Code Structure
In order to separate from the old code, we have defined new modules for
execution flow. This facilitates parallel
development at the current stage, and reduces the difficulty of merging.
@@ -16,216 +16,248 @@ development at the current stage, and reduces the
difficulty of merging.
- ../`seatunnel-translation`
translation layer for the connector-v2
- ../`seatunnel-transform-v2`
transform v2 connector implementation
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e`
connector v2 e2e code
-- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use flink local running instance
-- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use spark local running instance
+- ../seatunnel-examples/`seatunnel-engine-examples`
seatunnel connector-v2 example use Zeta local running instance
+- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2 example use Flink local running instance
+- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2 example use Spark local running instance
-### **Example**
+### Example
-We have prepared two new version of the locally executable example program in
`seatunnel-examples`,one
-is
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-, it runs in the Flink engine. Another one
-is
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-, it runs in the Spark engine. This is also the debugging method that is often
used in the local development of
-Connector. You can debug these examples, which will help you better understand
the running logic of the program. The
-configuration files used in example are saved in the "resources/examples"
folder. If you want to add examples for your
-own connectors, you need to follow the steps below.
+We have prepared three locally executable example programs in
`seatunnel-examples`:
+-
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,
which runs on the Zeta engine
+-
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,
which runs on the Flink engine
+-
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,
which runs on the Spark engine
-1. Add the groupId, artifactId and version of the connector to be tested to
- `seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it
to
- `seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you
want to runs it in Spark engine) as a
- dependency.
-2. Find the dependency in your connector pom file which scope is test or
provided and then add them to
- seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
- seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and
modify the scope to compile.
-3. Add the task configuration file under resources/examples.
-4. Configure the file in the `SeaTunnelApiExample` main method.
-5. Just run the main method.
+You can debug these examples to help you better understand the running logic
of the program. The configuration files used are saved in the
`resources/examples` folder.
+If you want to add your own connectors, you need to follow the steps below.
-### **Create new seatunnel v2 connector**
+To add a new connector to the example using the Zeta engine, follow these
steps:
+1. Add the connector dependency's `groupId`, `artifactId`, and `version` to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` (or to
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml` or
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` if you want
to run it on the Flink or Spark engine, respectively).
+2. If there are dependencies in your connector with `scope` set to `test` or
`provided`, add these dependencies to
`seatunnel-examples/seatunnel-engine-examples/pom.xml` and change the `scope`
to `compile`.
+3. Add the task configuration file under `resources/examples`.
+4. Configure the file path in the `SeaTunnelEngineLocalExample.java` main
method.
+5. Run the main method.
-1.Create a new module under the `seatunnel-connectors-v2` directory and name
it connector - {connector name}.
+### Create New Seatunnel V2 Connector
-2.The pom file can refer to the pom file of the existing connector, and add
the current sub model to the pom file of the parent model
+1. Create a new module under the `seatunnel-connectors-v2` directory and name
it connector-{ConnectorName}.
+2. The `pom.xml` file can refer to the `pom.xml` file of the existing
connector, and add the current sub-module to `seatunnel-connectors-v2/pom.xml`.
+3. Create two packages corresponding to source and sink
-3.Create two packages corresponding to source and sink
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.source
+ package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.sink
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
- package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
+4. add connector info to plugin-mapping.properties file in seatunnel root path.
-4.add connector info to plugin-mapping.properties file in seatunnel root path.
+5. add connector dependency to seatunnel-dist/pom.xml, so the connector jar
can be find in binary package.
-5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can
be find in binary package.
+6. There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
-6.There are several classes that must be implemented on the source side,
namely {ConnectorName}Source, {ConnectorName}SourceFactory,
{ConnectorName}SourceReader; There are several classes that must be implemented
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory,
{ConnectorName}SinkWriter Please refer to other connectors for details
+7. {ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the `@AutoService (Factory.class)` annotation on the class name,
and in addition to the required methods, source side an additional
`creatSource` method needs to be rewritten and sink side an additional
`creatSink` method needs to be rewritten
-7.{ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be
annotated with the **@AutoService (Factory.class)** annotation on the class
name, and in addition to the required methods, source side an additional
**creatSource** method needs to be rewritten and sink side an additional
**creatSink** method needs to be rewritten
+8. {ConnectorName}Source needs to override the `getProducedCatalogTables`
method; {ConnectorName}Sink needs to override the `getWriteCatalogTable` method
-8.{ConnectorName}Source needs to override the **getProducedCatalogTables**
method; {ConnectorName}Sink needs to override the **getWriteCatalogTable**
method
+### Startup Class
-### **Startup Class**
+We have created three starter projects: `seatunnel-core/seatunnel-starter`,
`seatunnel-core/seatunnel-flink-starter`, and
`seatunnel-core/seatunnel-spark-starter`.
+Here you can find how to parse configuration files into executable
Zeta/Flink/Spark processes.
-Aside from the old startup class, we have created two new startup modules,
-namely ``seatunnel-core/seatunnel-flink-starter`` and
``seatunnel-core/seatunnel-spark-starter``. You can find out how
-to parse the configuration file into an executable Flink/Spark process here.
+### SeaTunnel API
-### **SeaTunnel API**
+The `seatunnel-api` module is used to store the new interfaces defined by the
SeaTunnel API. By implementing these interfaces, developers can create
SeaTunnel Connectors that support multiple engines.
-A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to
store the new interfaces defined by the
-SeaTunnel API. By implementing these interfaces, developers can complete the
SeaTunnel Connector that supports multiple
-engines.
-
-### **Translation Layer**
+### Translation Layer
We realize the conversion between SeaTunnel API and Engine API by adapting the
interfaces of different engines, so as to
achieve the effect of translation, and let our SeaTunnel Connector support the
operation of multiple different engines.
-The corresponding code address, ``seatunnel-translation``, this module has the
corresponding translation layer
+The corresponding code address, `seatunnel-translation`, this module has the
corresponding translation layer
implementation. If you are interested, you can view the code and help us
improve the current code.
-## **API introduction**
+## API introduction
The API design of the current version of SeaTunnel draws on the design concept
of Flink.
-### **Source**
+### Source
+
+#### TableSourceFactory.java
+
+- Used to create a factory class for Source, through which Source instances
are created using the `createSource` method.
+- `factoryIdentifier` is used to identify the name of the current Factory,
which is also configured in the configuration file to distinguish different
connectors.
+- `optionRule` is used to define the parameters supported by the current
connector. This method can be used to define the logic of the parameters, such
as which parameters are required, which are optional, which are mutually
exclusive, etc.
+ SeaTunnel will use `OptionRule` to verify the validity of the user's
configuration. Please refer to the `Option` below.
+- Make sure to add the `@AutoService(Factory.class)` annotation to
`TableSourceFactory`.
-#### **SeaTunnelSource.java**
+#### SeaTunnelSource.java
-- The Source of SeaTunnel adopts the design of stream-batch integration,
``getBoundedness`` which determines whether the
+- The Source of SeaTunnel adopts the design of stream-batch integration,
`getBoundedness` which determines whether the
current Source is a stream Source or a batch Source, so you can specify a
Source by dynamic configuration (refer to
the default method), which can be either a stream or a batch.
-- ``getRowTypeInfo`` To get the schema of the data, the connector can choose
to hard-code to implement a fixed schema,
- or run the user to customize the schema through config configuration. The
latter is recommended.
+- `getProducedCatalogTables` is used to get the schema of the data. The
connector can choose to hard-code to implement a fixed schema or implement a
custom schema through user-defined configuration.
+ The latter is recommended.
- SeaTunnelSource is a class executed on the driver side, through which
objects such as SourceReader, SplitEnumerator
and serializers are obtained.
- Currently, the data type supported by SeaTunnelSource must be SeaTunnelRow.
-#### **SourceSplitEnumerator.java**
+#### SourceSplitEnumerator.java
Use this enumerator to get the data read shard (SourceSplit) situation,
different shards may be assigned to different
SourceReaders to read data. Contains several key methods:
-- ``run``: Used to perform a spawn SourceSplit and call
``SourceSplitEnumerator.Context.assignSplit``: to distribute the
+- The `open` method is used to initialize the SourceSplitEnumerator. In this
method, you can initialize resources such as database connections or states.
+- `run`: Used to perform a spawn SourceSplit and call
`SourceSplitEnumerator.Context.assignSplit`: to distribute the
shards to the SourceReader.
-- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these
Splits when SourceSplit cannot be processed
+- `addSplitsBackSourceSplitEnumerator`: is required to redistribute these
Splits when SourceSplit cannot be processed
normally or restarted due to the exception of SourceReader.
-- ``registerReaderProcess``: some SourceReaders that are registered after the
run is run. If there is no SourceSplit
+- `registerReaderProcess`: some SourceReaders that are registered after the
run is run. If there is no SourceSplit
distributed at this time, it can be distributed to these new readers (yes,
you need to maintain your SourceSplit
distribution in SourceSplitEnumerator most of the time).
-- ``handleSplitRequest``: If some Readers actively request SourceSplit from
SourceSplitEnumerator, this method can be
+- `handleSplitRequest`: If some Readers actively request SourceSplit from
SourceSplitEnumerator, this method can be
called SourceSplitEnumerator.Context.assignSplit to sends shards to the
corresponding Reader.
-- ``snapshotState``: It is used for stream processing to periodically return
the current state that needs to be saved.
+- `snapshotState`: It is used for stream processing to periodically return the
current state that needs to be saved.
If there is a state restoration, it will be called
SeaTunnelSource.restoreEnumerator to constructs a
SourceSplitEnumerator and restore the saved state to the
SourceSplitEnumerator.
-- ``notifyCheckpointComplete``: It is used for subsequent processing after the
state is successfully saved, and can be
+- `notifyCheckpointComplete`: It is used for subsequent processing after the
state is successfully saved, and can be
used to store the state or mark in third-party storage.
+- `handleSourceEvent` is used to handle events from the `SourceReader`. You
can customize events, such as changes in the state of the `SourceReader`.
+- `close` is used to close the `SourceSplitEnumerator` and release resources.
+
+#### SourceSplitEnumerator.Context
+
+The `SourceSplitEnumerator.Context` is the context for the
`SourceSplitEnumerator`, which interacts with SeaTunnel. It includes several
key methods:
-#### **SourceSplit.java**
+- `currentParallelism`: Used to get the current task's parallelism.
+- `registeredReaders`: Used to get the list of currently registered
`SourceReader`.
+- `assignSplit`: Used to assign splits to `SourceReader`.
+- `signalNoMoreSplits`: Used to notify a `SourceReader` that there are no more
splits.
+- `sendEventToSourceReader`: Used to send events to `SourceReader`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for
sending events to SeaTunnel.
+
+#### SourceSplit.java
The interface used to save shards. Different shards need to define different
splitIds. You can implement this interface
to save the data that shards need to save, such as kafka's partition and
topic, hbase's columnfamily and other
information, which are used by SourceReader to determine Which part of the
total data should be read.
-#### **SourceReader.java**
+#### SourceReader.java
The interface that directly interacts with the data source, and the action of
reading data from the data source is
completed by implementing this interface.
-- ``pollNext``: It is the core of Reader. Through this interface, the process
of reading the data of the data source and
+- `pollNext`: It is the core of Reader. Through this interface, the process of
reading the data of the data source and
returning it to SeaTunnel is realized. Whenever you are ready to pass data
to SeaTunnel, you can call
- the ``Collector.collect`` method in the parameter, which can be called an
infinite number of times to complete a large
- amount of data reading. But the data format supported at this stage can only
be ``SeaTunnelRow``. Because our Source
+ the `Collector.collect` method in the parameter, which can be called an
infinite number of times to complete a large
+ amount of data reading. But the data format supported at this stage can only
be `SeaTunnelRow`. Because our Source
is a stream-batch integration, the Connector has to decide when to end data
reading in batch mode. For example, a
- batch reads 100 pieces of data at a time. After the reading is completed, it
needs ``pollNext`` to call in
- to ``SourceReader.Context.signalNoMoreElementnotify`` SeaTunnel that there
is no data to read . , then you can use
+ batch reads 100 pieces of data at a time. After the reading is completed, it
needs `pollNext` to call in
+ to `SourceReader.Context.signalNoMoreElementnotify` SeaTunnel that there is
no data to read . , then you can use
these 100 pieces of data for batch processing. Stream processing does not
have this requirement, so most SourceReaders
with integrated stream batches will have the following code:
-```java
+``java
if(Boundedness.BOUNDED.equals(context.getBoundedness())){
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
break;
}
-```
+``
It means that SeaTunnel will be notified only in batch mode.
-- ``addSplits``: Used by the framework to assign SourceSplit to different
SourceReaders, SourceReader should save the
+- `addSplits`: Used by the framework to assign SourceSplit to different
SourceReaders, SourceReader should save the
obtained shards, and then pollNextread the corresponding shard data in it,
but there may be times when the Reader does
not read shards (maybe SourceSplit has not been generated or The current
Reader is indeed not allocated), at this
time, pollNextcorresponding processing should be made, such as continuing to
wait.
-- ``handleNoMoreSplits``: When triggered, it indicates that there are no more
shards, and the Connector Source is
+- `handleNoMoreSplits`: When triggered, it indicates that there are no more
shards, and the Connector Source is
required to optionally make corresponding feedback
-- ``snapshotStateIt``: is used for stream processing to periodically return
the current state that needs to be saved,
+- `snapshotStateIt`: is used for stream processing to periodically return the
current state that needs to be saved,
that is, the fragmentation information (SeaTunnel saves the fragmentation
information and state together to achieve
dynamic allocation).
-- ``notifyCheckpointComplete``: Like ``notifyCheckpointAborted`` the name, it
is a callback for different states of
+- `notifyCheckpointComplete`: Like `notifyCheckpointAborted` the name, it is a
callback for different states of
checkpoint.
-### **Sink**
+#### SourceReader.Context
+
+The `SourceReader.Context` is the context for the `SourceReader`, which
interacts with SeaTunnel. It includes several key methods:
+
+- `getIndexOfSubtask`: Used to get the current Reader's subTask index.
+- `getBoundedness`: Used to get the current Reader's Boundedness, whether it
is stream or batch.
+- `signalNoMoreElement`: Used to notify SeaTunnel that there are no more
elements to read.
+- `sendSplitRequest`: Used to request splits from the `SourceSplitEnumerator`
when the Reader has no splits.
+- `sendSourceEventToEnumerator`: Used to send events to the
`SourceSplitEnumerator`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for
sending events to SeaTunnel.
+
+### Sink
-#### **SeaTunnelSink.java**
+#### TableSinkFactory.java
-It is used to define the way to write data to the destination, and obtain
instances such as ``SinkWriter``
-and ``SinkCommitter`` through this interface. An important feature of the sink
side is the processing of distributed
-transactions. SeaTunnel defines two different Committers: ``SinkCommitter``
used to process transactions for different
-subTasks ``SinkAggregatedCommitter``. Process transaction results for all
nodes. Different Connector Sinks can be
-selected according to component properties, whether to implement only
``SinkCommitter`` or ``SinkAggregatedCommitter``,
+- Used to create a factory class for the Sink, through which Sink instances
are created using the `createSink` method.
+- `factoryIdentifier` is used to identify the name of the current Factory,
which is also configured in the configuration file to distinguish different
connectors.
+- `optionRule` is used to define the parameters supported by the current
connector. You can use this method to define the logic of the parameters, such
as which parameters are required, which parameters are optional, which
parameters are mutually exclusive, etc. SeaTunnel will use `OptionRule` to
verify the validity of the user's configuration. Please refer to the Option
below.
+- Make sure to add the `@AutoService(Factory.class)` annotation to the
`TableSinkFactory` class.
+
+#### SeaTunnelSink.java
+
+It is used to define the way to write data to the destination, and obtain
instances such as `SinkWriter`
+and `SinkCommitter` through this interface. An important feature of the sink
side is the processing of distributed
+transactions. SeaTunnel defines two different Committers: `SinkCommitter` used
to process transactions for different
+subTasks `SinkAggregatedCommitter`. Process transaction results for all nodes.
Different Connector Sinks can be
+selected according to component properties, whether to implement only
`SinkCommitter` or `SinkAggregatedCommitter`,
or both.
-#### **SinkWriter.java**
+- `createWriter` is used to create a `SinkWriter` instance. The `SinkWriter`
is an interface that interacts with the data source, allowing data to be
written to the data source through this interface.
+- `restoreWriter` is used to restore the `SinkWriter` to its previous state
during state recovery. This method is called when the task is restored.
+- `getWriteCatalogTable` is used to get the `SeaTunnel CatalogTable`
corresponding to the table written by the `Sink`. SeaTunnel will handle
metrics-related logic based on this `CatalogTable`.
+
+#### SinkWriter.java
It is used to directly interact with the output source, and provide the data
obtained by SeaTunnel through the data
source to the Writer for data writing.
-- ``write``: Responsible for transferring data to ``SinkWriter``, you can
choose to write it directly, or write it after
- buffering a certain amount of data. Currently, only the data type is
supported ``SeaTunnelRow``.
-- ``prepareCommit``: Executed before commit, you can write data directly here,
or you can implement phase one in 2pc,
- and then implement phase two in ``SinkCommitter`` or
``SinkAggregatedCommitter``. What this method returns is the
- commit information, which will be provided ``SinkCommitter`` and
``SinkAggregatedCommitter`` used for the next stage
+- `write`: Responsible for transferring data to `SinkWriter`, you can choose
to write it directly, or write it after
+ buffering a certain amount of data. Currently, only the data type is
supported `SeaTunnelRow`.
+- `prepareCommit`: Executed before commit, you can write data directly here,
or you can implement phase one in 2pc,
+ and then implement phase two in `SinkCommitter` or
`SinkAggregatedCommitter`. What this method returns is the
+ commit information, which will be provided `SinkCommitter` and
`SinkAggregatedCommitter` used for the next stage
of transaction processing.
+- `snapshotState` is used to periodically return the current state to be saved
during stream processing. If there is a state recovery,
`SeaTunnelSink.restoreWriter` will be called to construct the `SinkWriter` and
restore the saved state to the `SinkWriter`.
+- `abortPrepare` is executed when `prepareCommit` fails, used to roll back the
operations of `prepareCommit`.
+- `close` is used to close the `SinkWriter` and release resources.
-#### **SinkCommitter.java**
+##### SinkWriter.Context
-It is used to process ``SinkWriter.prepareCommit`` the returned data
information, including transaction information that
-needs to be submitted.
+The `Context` is the context for the `SinkWriter`, which interacts with
SeaTunnel. It includes several key methods:
-#### **SinkAggregatedCommitter.java**
+- `getIndexOfSubtask`: Used to get the current Writer's subTask index.
+- `getNumberOfParallelSubtasks`: Used to get the current task's parallelism.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for
sending events to SeaTunnel.
-It is used to process ``SinkWriter.prepareCommit`` the returned data
information, including transaction information that
-needs to be submitted, etc., but it will be processed together on a single
node, which can avoid the problem of
-inconsistency of the state caused by the failure of the second part of the
stage.
+#### SinkCommitter.java
-- ``combine``: Used ``SinkWriter.prepareCommit`` to aggregate the returned
transaction information, and then generate
- aggregated transaction information.
+Used to process the data information returned by `SinkWriter.prepareCommit`,
including the transaction information that needs to be submitted. Unlike
`SinkAggregatedCommitter`, `SinkCommitter` is executed on each node. We
recommend using `SinkAggregatedCommitter`.
-#### **Implement SinkCommitter or SinkAggregatedCommitter?**
+- `commit`: Used to submit the transaction information returned by
`SinkWriter.prepareCommit`. If it fails, idempotency must be implemented to
ensure that the engine retry can work normally.
+- `abort`: Used to roll back the operations of `SinkWriter.prepareCommit`. If
it fails, idempotency must be implemented to ensure that the engine retry can
work normally.
-In the current version, it is recommended to implement
``SinkAggregatedCommitter`` as the first choice, which can
-provide strong consistency guarantee in Flink/Spark. At the same time, commit
should be idempotent, and save engine
-retry can work normally.
+#### SinkAggregatedCommitter.java
-### TableSourceFactory and TableSinkFactory
+Used to process the data information returned by `SinkWriter.prepareCommit`,
including the transaction information that needs to be submitted. However, it
will be processed together on a single node, which can avoid the problem of
inconsistency caused by the failure of the second part of the stage.
-In order to automatically create the Source Connector and Sink Connector and
Transform Connector, we need the connector to return the parameters needed to
create them and the verification rules for each parameter. For Source Connector
and Sink Connector, we define `TableSourceFactory` and `TableSinkFactory`
-supported by the current connector and the required parameters. We define
TableSourceFactory and TableSinkFactory,
-It is recommended to put it in the same directory as the implementation class
of SeaTunnelSource or SeaTunnelSink for easy searching.
+- `init`: Used to initialize the `SinkAggregatedCommitter`. You can initialize
some resources for the connector here, such as connecting to a database or
initializing some states.
+- `restoreCommit`: Used to restore the `SinkAggregatedCommitter` to its
previous state during state recovery. This method is called when the task is
restored, and we should retry committing the unfinished transactions in this
method.
+- `commit`: Used to submit the transaction information returned by
`SinkWriter.prepareCommit`. If it fails, idempotency must be implemented to
ensure that the engine retry can work normally.
+- `combine`: Used to aggregate the transaction information returned by
`SinkWriter.prepareCommit` and then generate aggregated transaction information.
+- `abort`: Used to roll back the operations of `SinkWriter.prepareCommit`. If
it fails, idempotency must be implemented to ensure that the engine retry can
work normally.
+- `close`: Used to close the `SinkAggregatedCommitter` and release resources.
-- `factoryIdentifier` is used to indicate the name of the current Factory.
This value should be the same as the
- value returned by `getPluginName`, so that if Factory is used to create
Source/Sink in the future,
- A seamless switch can be achieved.
-- `createSink` and `createSource` are the methods for creating Source and Sink
respectively.
-- `optionRule` returns the parameter logic, which is used to indicate which
parameters of our connector are supported,
- which parameters are required, which parameters are optional, and which
parameters are exclusive, which parameters are bundledRequired.
- This method will be used when we visually create the connector logic, and it
will also be used to generate a complete parameter
- object according to the parameters configured by the user, and then the
connector developer does not need to judge whether the parameters
- exist one by one in the Config, and use it directly That's it.
- You can refer to existing implementations, such as
`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`.
- There is support for configuring Schema for many Sources, so a common Option
is used.
- If you need a schema, you can refer to
`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`.
+#### Implement SinkCommitter or SinkAggregatedCommitter?
-Don't forget to add `@AutoService(Factory.class)` to the class. This Factory
is the parent class of TableSourceFactory and TableSinkFactory.
+In the current version, it is recommended to implement
`SinkAggregatedCommitter` as the first choice, which can
+provide strong consistency guarantee in Flink/Spark. At the same time, commit
should be idempotent, and save engine
+retry can work normally.
-### **Options**
+### Options
When we implement TableSourceFactory and TableSinkFactory, the corresponding
Option will be created.
Each Option corresponds to a configuration, but different configurations will
have different types.
@@ -238,7 +270,14 @@ In most cases, the default is empty. `description` is used
to indicate the descr
This parameter is optional. It is recommended to be consistent with the
documentation. For specific examples,
please refer to
`org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertSinkFactory`.
-## **Result**
+In `TableSourceFactory` and `TableSinkFactory`, the `optionRule` method
returns the parameter logic,
+which defines which parameters are supported by our connector, which
parameters are required, which parameters are optional,
+which parameters are mutually exclusive, and which parameters are bundled
required. This method will be used when we visually create the connector logic,
+and it will also be used to generate a complete parameter object based on the
user's configured parameters, so that connector developers do not need to check
each parameter in the config individually and can use it directly.
+You can refer to existing implementations, such as
`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`.
For many sources that support schema configuration, a common option is used,
and if a schema is needed,
+you can refer to
`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`.
+
+## Implement
-All Connector implementations should be under the ``seatunnel-connectors-v2``,
and the examples that can be referred to
+All Connector implementations should be under the `seatunnel-connectors-v2`,
and the examples that can be referred to
at this stage are under this module.
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/README.zh.md
b/seatunnel-connectors-v2/README.zh.md
index b1ed13020f..741ad29b93 100644
--- a/seatunnel-connectors-v2/README.zh.md
+++ b/seatunnel-connectors-v2/README.zh.md
@@ -13,29 +13,31 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
- ../`seatunnel-translation`
connector-v2的翻译层
- ../`seatunnel-transform-v2`
transform-v2代码实现
- ../seatunnel-e2e/`seatunnel-connector-v2-e2e`
connector-v2端到端测试
+- ../seatunnel-examples/`seatunnel-engine-examples`
seatunnel connector-v2的Zeta引擎local运行的实例
- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`
seatunnel connector-v2的flink local运行的实例
- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`
seatunnel connector-v2的spark local运行的实例
### Example
-我们已经在`seatunnel-examples`
-准备了两个本地可执行的案例程序,其中一个是`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-,它运行在flink引擎上。另外一个是`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-,它运行在spark引擎上。你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。
+我们已经在`seatunnel-examples`准备了三个本地可执行的案例程序
+-
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,它运行在Zeta引擎上
+-
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,它运行在flink引擎上
+-
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,它运行在spark引擎上
+你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。
-1.
在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`添加connector依赖的groupId,
artifactId 和
-
version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖)
-2.
如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(
-
或者在seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml)中,并且修改scope为compile.
-3. 在resources/examples下添加任务配置文件.
-4. 在`SeaTunnelApiExample` main方法中配置文件.
-5. 运行main方法即可.
+以Zeta引擎为例,你可以通过以下步骤来添加一个新的connector到example中:
+1.
在`seatunnel-examples/seatunnel-engine-examples/pom.xml`添加connector依赖的groupId,
artifactId 和
+
version.(或者当你想在flink或spark引擎运行时请在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`或`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖,以下同理)。
+2.
如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到`seatunnel-examples/seatunnel-engine-examples/pom.xml`并且修改scope为compile。
+3. 在resources/examples下添加任务配置文件。
+4. 在`SeaTunnelEngineLocalExample.java` main方法中配置文件地址。
+5. 运行main方法即可。
-### 创建新的seatunnel v2 connector
+### 创建新的SeaTunnel V2 Connector
1.在`seatunnel-connectors-v2`目录下新建一个module,命名为connector-{连接器名}.
-2.pom文件可以参考已有连接器的pom文件,并在父model的pom文件中添加当前子model.
+2.pom文件可以参考已有连接器的pom文件,并在`seatunnel-connectors-v2/pom.xml`中添加当前子model.
3.新建两个package分别对应source和sink
@@ -54,12 +56,12 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
### 启动类
-和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`.
-可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。
+我们创建了三个启动类工程,分别是`seatunnel-core/seatunnel-starter`,`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`。
+可以在这里找到如何将配置文件解析为可以执行的Zeta/Flink/Spark流程。
### SeaTunnel API
-新建了一个`seatunnel-api`(不是`seatunnel-apis`)模块,用于存放SeaTunnel API定义的新接口,
开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel Connector
+`seatunnel-api`模块,用于存放SeaTunnel API定义的新接口, 开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel
Connector
### 翻译层
@@ -72,11 +74,18 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
### Source
+#### TableSourceFactory.java
+
+- 用于创建Source的工厂类,通过该类来创建Source实例,通过`createSource`方法来创建Source实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+- `optionRule`
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
+- 请确保在`TableSourceFactory`添加`@AutoService(Factory.class)`注解。
+
#### SeaTunnelSource.java
- SeaTunnel的Source采用流批一体的设计,通过`getBoundedness`
来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。
--
`getRowTypeInfo`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者运行用户通过config配置来自定义schema,推荐后者。
+-
`getProducedCatalogTables`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者实现通过用户定义的config配置来自定义schema,推荐后者。
- SeaTunnelSource是执行在driver端的类,通过该类,来获取SourceReader,SplitEnumerator等对象以及序列化器。
- 目前SeaTunnelSource支持的生产的数据类型必须是SeaTunnelRow类型。
@@ -84,6 +93,7 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
通过该枚举器来获取数据读取的分片(SourceSplit)情况,不同的分片可能会分配给不同的SourceReader来读取数据。包含几个关键方法:
+- `open`用于初始化SourceSplitEnumerator,可以在这里初始化一些连接器的资源,比如连接数据库,初始化一些状态等。
-
`run`用于执行产生SourceSplit并调用`SourceSplitEnumerator.Context.assignSplit`来将分片分发给SourceReader。
-
`addSplitsBack`用于处理SourceReader异常导致SourceSplit无法正常处理或者重启时,需要SourceSplitEnumerator对这些Split进行重新分发。
- `registerReader`
@@ -93,6 +103,20 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
-
`snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator`
来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。
- `notifyCheckpointComplete`用于状态保存成功后的后续处理,可以用于将状态或者标记存入第三方存储。
+- `handleSourceEvent`用于处理SourceReader的事件,可以自定义事件,比如SourceReader的状态变化等。
+- `close`用于关闭SourceSplitEnumerator,释放资源。
+
+##### SourceSplitEnumerator.Context
+
+Context是SourceSplitEnumerator的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `currentParallelism`用于获取当前任务的并行度。
+- `registeredReaders`用于获取当前已经注册的SourceReader列表。
+- `assignSplit`用于将分片分发给SourceReader。
+- `signalNoMoreSplits`用于通知某个Reader没有更多的分片了。
+- `sendEventToSourceReader`用于发送事件给SourceReader。
+- `getMetricsContext`用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener`用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
#### SourceSplit.java
@@ -124,14 +148,37 @@ if (Boundedness.BOUNDED.equals(context.getBoundedness()))
{
- `snapshotState`用于流处理定时返回需要保存的当前状态,也就是分片信息(SeaTunnel将分片信息和状态保存在一起,实现动态分配)。
-
`notifyCheckpointComplete`和`notifyCheckpointAborted`和名字一样,是checkpoint不同状态下的回调。
+##### SourceReader.Context
+
+Context是SourceReader的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `getIndexOfSubtask`用于获取当前Reader的subTask索引。
+- `getBoundedness`用于获取当前Reader的Boundedness,是流还是批。
+- `signalNoMoreElement`用于通知SeaTunnel没有数据读取了。
+- `sendSplitRequest`用于向SourceSplitEnumerator请求分片,用于在Reader没有分片的时候主动请求分片。
+- `sendSourceEventToEnumerator`用于发送事件给SourceSplitEnumerator。
+- `getMetricsContext`用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener`用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
+
### Sink
+#### TableSinkFactory.java
+
+- 用于创建Sink的工厂类,通过该类来创建Sink实例,通过`createSink`方法来创建Sink实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+- `optionRule`
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
+- 请确保在`TableSinkFactory`添加`@AutoService(Factory.class)`注解。
+
#### SeaTunnelSink.java
用于定义数据写入目标端的方式,通过该接口来实现获取SinkWriter和SinkCommitter等实例。Sink端有一个重要特性就是分布式事务的处理,SeaTunnel定义了两种不同的Committer:`SinkCommitter`
用于处理针对不同的subTask进行事务的处理,每个subTask处理各自的事务,然后成功后再由`SinkAggregatedCommitter`单线程的处理所有节点的事务结果。不同的Connector
Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`SinkAggregatedCommitter`,还是都实现。
+- `createWriter`用于创建SinkWriter实例,SinkWriter是和数据源进行交互的接口,通过该接口来将数据写入到数据源。
+- `restoreWriter`用于恢复SinkWriter,用于在恢复状态时,将SinkWriter恢复到之前的状态,会在任务恢复时调用。
+- `getWriteCatalogTable`用于获取Sink写入表对应的SeaTunnel
CatalogTable,SeaTunnel会根据这个CatalogTable来处理指标相关的逻辑。
+
#### SinkWriter.java
用于直接和输出源进行交互,将SeaTunnel通过数据源取得的数据提供给Writer进行数据写入。
@@ -139,35 +186,41 @@ Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`
- `write` 负责将数据传入SinkWriter,可以选择直接写入,或者缓存到一定数据后再写入,目前数据类型只支持`SeaTunnelRow`。
- `prepareCommit`
在commit之前执行,可以在这直接写入数据,也可以实现2pc中的阶段一,然后在`SinkCommitter`或`SinkAggregatedCommitter`
中实现阶段二。该方法返回的就是commit信息,将会提供给`SinkCommitter`和`SinkAggregatedCommitter`用于下一阶段事务处理。
+- `snapshotState`
用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSink.restoreWriter`来构造SinkWriter,将保存的状态恢复给SinkWriter。
+- `abortPrepare` 在prepareCommit失败时执行,用于回滚prepareCommit的操作。
+- `close` 用于关闭SinkWriter,释放资源。
+
+##### SinkWriter.Context
+
+Context是SinkWriter的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `getIndexOfSubtask` 用于获取当前Writer的subTask索引。
+- `getNumberOfParallelSubtasks` 用于获取当前任务的并行度。
+- `getMetricsContext` 用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener` 用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
#### SinkCommitter.java
-用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等。
+用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等。和`SinkAggregatedCommitter`不同的是,`SinkCommitter`是在每个节点上执行的,我们更推荐使用`SinkAggregatedCommitter`。
+
+- `commit` 用于提交`SinkWriter.prepareCommit`返回的事务信息,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
+- `abort` 用于回滚`SinkWriter.prepareCommit`的操作,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
#### SinkAggregatedCommitter.java
用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等,但是会在单个节点一起处理,这样可以避免阶段二部分失败导致状态不一致的问题。
+- `init` 用于初始化`SinkAggregatedCommitter`,可以在这里初始化一些连接器的资源,比如连接数据库,初始化一些状态等。
+- `restoreCommit`
用于恢复`SinkAggregatedCommitter`,用于在恢复状态时,将`SinkAggregatedCommitter`恢复到之前的状态,会在任务恢复时调用,我们应该在这个方法里重新尝试提交上次未完成的事务。
+- `commit` 用于提交`SinkWriter.prepareCommit`返回的事务信息,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
- `combine` 用于将`SinkWriter.prepareCommit`返回的事务信息进行聚合,然后生成聚合的事务信息。
+- `abort` 用于回滚`SinkWriter.prepareCommit`的操作,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
+- `close` 用于关闭`SinkAggregatedCommitter`,释放资源。
#### 我应该实现SinkCommitter还是SinkAggregatedCommitter?
当前版本推荐将实现SinkAggregatedCommitter作为首选,可以在Flink/Spark中提供较强的一致性保证,同时commit应该要实现幂等性,保存引擎重试能够正常运作。
-### TableSourceFactory 和 TableSinkFactory
-
-为了实现自动化的创建Source或者Sink,我们需要连接器能够声明并返回创建他们所需要的参数列表和每个参数的校验规则。为了实现这个目标,我们定义了TableSourceFactory和TableSinkFactory,
-建议将其放在和SeaTunnelSource或SeaTunnelSink实现类同一目录下,方便寻找。
-
-- `factoryIdentifier`
用于表明当前Factory的名称,这个值应该和`getPluginName`返回的值一致,这样后续如果使用Factory来创建Source/Sink,就能实现无缝切换。
-- `createSink` 和 `createSource` 分别是创建Source和Sink的方法。
-- `optionRule`
返回的是参数逻辑,用于表示我们的连接器参数哪些支持,哪些参数是必须(required)的,哪些参数是可选(optional)的,哪些参数是互斥(exclusive)的,哪些参数是绑定(bundledRequired)的。
-
这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
-
可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
-
需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`。
-
-别忘记添加`@AutoService(Factory.class)` 到类上面。这个Factory即TableSourceFactory 和
TableSinkFactory的父类。
-
### Option
当我们实现TableSourceFactory 和
TableSinkFactory时,会创建对应的Option,每一个Option对应的就是一个配置,但是不同的配置会有不同的类型,普通类型直接调用对应的方法即可创建。
@@ -175,6 +228,11 @@ Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`
`OptionMark`有两个参数,`name`用于声明字段对应的参数名称,如果为空的话,我们会默认将java对应的小驼峰转换成下划线进行表达,如:`myUserPassword`->`my_user_password`。
在大多数情况下,默认为空即可。`description`用于表示当前参数的描述,这个参数是可选的,建议和文档上的保持一致。具体例子可以参考`org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertSinkFactory`。
+TableSourceFactory 和 TableSinkFactory 中的`optionRule`
返回的是参数逻辑,用于表示我们的连接器参数哪些支持,哪些参数是必须(required)的,哪些参数是可选(optional)的,哪些参数是互斥(exclusive)的,哪些参数是绑定(bundledRequired)的。
+这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
+可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
+需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`。
+
## 实现
现阶段所有的连接器实现及可参考的示例都在seatunnel-connectors-v2下,用户可自行查阅参考。
\ No newline at end of file