This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37612d9ed2 [Improve][Doc] Update connector v2 contribute guide (#8375)
37612d9ed2 is described below

commit 37612d9ed241a0a0a45de6c3c73ee685b27615be
Author: Jia Fan <[email protected]>
AuthorDate: Fri Dec 27 23:27:34 2024 +0800

    [Improve][Doc] Update connector v2 contribute guide (#8375)
    
    Co-authored-by: hailin0 <[email protected]>
---
 seatunnel-connectors-v2/README.md    | 271 ++++++++++++++++++++---------------
 seatunnel-connectors-v2/README.zh.md | 122 +++++++++++-----
 2 files changed, 245 insertions(+), 148 deletions(-)

diff --git a/seatunnel-connectors-v2/README.md 
b/seatunnel-connectors-v2/README.md
index e22e804cd9..c26924dcfa 100644
--- a/seatunnel-connectors-v2/README.md
+++ b/seatunnel-connectors-v2/README.md
@@ -5,7 +5,7 @@ in Apache SeaTunnel. This helps developers quickly understand 
API and transforma
 hand, it can guide contributors how to use the new API to develop new 
connectors.See
 this [issue](https://github.com/apache/seatunnel/issues/1608) for details.
 
-## **Code Structure**
+## Code Structure
 
 In order to separate from the old code, we have defined new modules for 
execution flow. This facilitates parallel
 development at the current stage, and reduces the difficulty of merging.
@@ -16,216 +16,248 @@ development at the current stage, and reduces the 
difficulty of merging.
 - ../`seatunnel-translation`                                          
translation layer for the connector-v2
 - ../`seatunnel-transform-v2`                                         
transform v2 connector implementation
 - ../seatunnel-e2e/`seatunnel-connector-v2-e2e`                       
connector v2 e2e code
-- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`        
seatunnel connector-v2 example use flink local running instance
-- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`        
seatunnel connector-v2 example use spark local running instance
+- ../seatunnel-examples/`seatunnel-engine-examples`                   
seatunnel connector-v2 example use Zeta local running instance 
+- ../seatunnel-examples/`seatunnel-flink-connector-v2-example`        
seatunnel connector-v2 example use Flink local running instance
+- ../seatunnel-examples/`seatunnel-spark-connector-v2-example`        
seatunnel connector-v2 example use Spark local running instance
 
-### **Example**
+### Example
 
-We have prepared two new version of the locally executable example program in 
`seatunnel-examples`,one
-is 
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-, it runs in the Flink engine. Another one
-is 
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-, it runs in the Spark engine. This is also the debugging method that is often 
used in the local development of
-Connector. You can debug these examples, which will help you better understand 
the running logic of the program. The
-configuration files used in example are saved in the "resources/examples" 
folder. If you want to add examples for your
-own connectors, you need to follow the steps below.
+We have prepared three locally executable example programs in 
`seatunnel-examples`:
+- 
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,
 which runs on the Zeta engine
+- 
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,
 which runs on the Flink engine
+- 
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,
 which runs on the Spark engine
 
-1. Add the groupId, artifactId and version of the connector to be tested to
-   `seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`(or add it 
to
-   `seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` when you 
want to runs it in Spark engine) as a
-   dependency.
-2. Find the dependency in your connector pom file which scope is test or 
provided and then add them to
-   seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(or add it to
-   seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml) file and 
modify the scope to compile.
-3. Add the task configuration file under resources/examples.
-4. Configure the file in the `SeaTunnelApiExample` main method.
-5. Just run the main method.
+You can debug these examples to help you better understand the running logic 
of the program. The configuration files used are saved in the 
`resources/examples` folder.
+If you want to add your own connectors, you need to follow the steps below.
 
-### **Create new seatunnel v2 connector**
+To add a new connector to the example using the Zeta engine, follow these 
steps:
+1. Add the connector dependency's `groupId`, `artifactId`, and `version` to 
`seatunnel-examples/seatunnel-engine-examples/pom.xml` (or to 
`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml` or 
`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml` if you want 
to run it on the Flink or Spark engine, respectively).
+2. If there are dependencies in your connector with `scope` set to `test` or 
`provided`, add these dependencies to 
`seatunnel-examples/seatunnel-engine-examples/pom.xml` and change the `scope` 
to `compile`.
+3. Add the task configuration file under `resources/examples`.
+4. Configure the file path in the `SeaTunnelEngineLocalExample.java` main 
method.
+5. Run the main method.
 
-1.Create a new module under the `seatunnel-connectors-v2` directory and name 
it connector - {connector name}.
+### Create New Seatunnel V2 Connector
 
-2.The pom file can refer to the pom file of the existing connector, and add 
the current sub model to the pom file of the parent model
+1. Create a new module under the `seatunnel-connectors-v2` directory and name 
it connector-{ConnectorName}.
+2. The `pom.xml` file can refer to the `pom.xml` file of the existing 
connector, and add the current sub-module to `seatunnel-connectors-v2/pom.xml`.
+3. Create two packages corresponding to source and sink
 
-3.Create two packages corresponding to source and sink
+    package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.source
+    package org.apache.seatunnel.connectors.seatunnel.{ConnectorName}}.sink
 
-    package org.apache.seatunnel.connectors.seatunnel.{connector name}}.source
-    package org.apache.seatunnel.connectors.seatunnel.{connector name}}.sink
+4. add connector info to plugin-mapping.properties file in seatunnel root path.
 
-4.add connector info to plugin-mapping.properties file in seatunnel root path.
+5. add connector dependency to seatunnel-dist/pom.xml, so the connector jar 
can be find in binary package.
 
-5.add connector dependency to seatunnel-dist/pom.xml, so the connector jar can 
be find in binary package.
+6. There are several classes that must be implemented on the source side, 
namely {ConnectorName}Source, {ConnectorName}SourceFactory, 
{ConnectorName}SourceReader; There are several classes that must be implemented 
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory, 
{ConnectorName}SinkWriter Please refer to other connectors for details
 
-6.There are several classes that must be implemented on the source side, 
namely {ConnectorName}Source, {ConnectorName}SourceFactory, 
{ConnectorName}SourceReader; There are several classes that must be implemented 
on the sink side, namely {ConnectorName}Sink, {ConnectorName}SinkFactory, 
{ConnectorName}SinkWriter Please refer to other connectors for details
+7. {ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be 
annotated with the `@AutoService (Factory.class)` annotation on the class name, 
and in addition to the required methods, source side an additional 
`creatSource` method needs to be rewritten and sink side an additional 
`creatSink` method needs to be rewritten
 
-7.{ConnectorName}SourceFactory and {ConnectorName}SinkFactory needs to be 
annotated with the **@AutoService (Factory.class)** annotation on the class 
name, and in addition to the required methods, source side an additional 
**creatSource** method needs to be rewritten and sink side an additional 
**creatSink** method needs to be rewritten
+8. {ConnectorName}Source needs to override the `getProducedCatalogTables` 
method; {ConnectorName}Sink needs to override the `getWriteCatalogTable` method
 
-8.{ConnectorName}Source needs to override the **getProducedCatalogTables** 
method; {ConnectorName}Sink needs to override the **getWriteCatalogTable** 
method
+### Startup Class
 
-### **Startup Class**
+We have created three starter projects: `seatunnel-core/seatunnel-starter`, 
`seatunnel-core/seatunnel-flink-starter`, and 
`seatunnel-core/seatunnel-spark-starter`. 
+Here you can find how to parse configuration files into executable 
Zeta/Flink/Spark processes.
 
-Aside from the old startup class, we have created two new startup modules,
-namely ``seatunnel-core/seatunnel-flink-starter`` and 
``seatunnel-core/seatunnel-spark-starter``. You can find out how
-to parse the configuration file into an executable Flink/Spark process here.
+### SeaTunnel API
 
-### **SeaTunnel API**
+The `seatunnel-api` module is used to store the new interfaces defined by the 
SeaTunnel API. By implementing these interfaces, developers can create 
SeaTunnel Connectors that support multiple engines.
 
-A new ``seatunnel-api`` (not ``seatunnel-apis``) module has been created to 
store the new interfaces defined by the
-SeaTunnel API. By implementing these interfaces, developers can complete the 
SeaTunnel Connector that supports multiple
-engines.
-
-### **Translation Layer**
+### Translation Layer
 
 We realize the conversion between SeaTunnel API and Engine API by adapting the 
interfaces of different engines, so as to
 achieve the effect of translation, and let our SeaTunnel Connector support the 
operation of multiple different engines.
-The corresponding code address, ``seatunnel-translation``, this module has the 
corresponding translation layer
+The corresponding code address, `seatunnel-translation`, this module has the 
corresponding translation layer
 implementation. If you are interested, you can view the code and help us 
improve the current code.
 
-## **API introduction**
+## API introduction
 
 The API design of the current version of SeaTunnel draws on the design concept 
of Flink.
 
-### **Source**
+### Source
+
+#### TableSourceFactory.java
+
+- Used to create a factory class for Source, through which Source instances 
are created using the `createSource` method.
+- `factoryIdentifier` is used to identify the name of the current Factory, 
which is also configured in the configuration file to distinguish different 
connectors.
+- `optionRule` is used to define the parameters supported by the current 
connector. This method can be used to define the logic of the parameters, such 
as which parameters are required, which are optional, which are mutually 
exclusive, etc. 
+  SeaTunnel will use `OptionRule` to verify the validity of the user's 
configuration. Please refer to the `Option` below.
+- Make sure to add the `@AutoService(Factory.class)` annotation to 
`TableSourceFactory`.
 
-#### **SeaTunnelSource.java**
+#### SeaTunnelSource.java
 
-- The Source of SeaTunnel adopts the design of stream-batch integration, 
``getBoundedness`` which determines whether the
+- The Source of SeaTunnel adopts the design of stream-batch integration, 
`getBoundedness` which determines whether the
   current Source is a stream Source or a batch Source, so you can specify a 
Source by dynamic configuration (refer to
   the default method), which can be either a stream or a batch.
-- ``getRowTypeInfo`` To get the schema of the data, the connector can choose 
to hard-code to implement a fixed schema,
-  or run the user to customize the schema through config configuration. The 
latter is recommended.
+- `getProducedCatalogTables` is used to get the schema of the data. The 
connector can choose to hard-code to implement a fixed schema or implement a 
custom schema through user-defined configuration. 
+  The latter is recommended.
 - SeaTunnelSource is a class executed on the driver side, through which 
objects such as SourceReader, SplitEnumerator
   and serializers are obtained.
 - Currently, the data type supported by SeaTunnelSource must be SeaTunnelRow.
 
-#### **SourceSplitEnumerator.java**
+#### SourceSplitEnumerator.java
 
 Use this enumerator to get the data read shard (SourceSplit) situation, 
different shards may be assigned to different
 SourceReaders to read data. Contains several key methods:
 
-- ``run``: Used to perform a spawn SourceSplit and call 
``SourceSplitEnumerator.Context.assignSplit``: to distribute the
+- The `open` method is used to initialize the SourceSplitEnumerator. In this 
method, you can initialize resources such as database connections or states.
+- `run`: Used to perform a spawn SourceSplit and call 
`SourceSplitEnumerator.Context.assignSplit`: to distribute the
   shards to the SourceReader.
-- ``addSplitsBackSourceSplitEnumerator``: is required to redistribute these 
Splits when SourceSplit cannot be processed
+- `addSplitsBackSourceSplitEnumerator`: is required to redistribute these 
Splits when SourceSplit cannot be processed
   normally or restarted due to the exception of SourceReader.
-- ``registerReaderProcess``: some SourceReaders that are registered after the 
run is run. If there is no SourceSplit
+- `registerReaderProcess`: some SourceReaders that are registered after the 
run is run. If there is no SourceSplit
   distributed at this time, it can be distributed to these new readers (yes, 
you need to maintain your SourceSplit
   distribution in SourceSplitEnumerator most of the time).
-- ``handleSplitRequest``: If some Readers actively request SourceSplit from 
SourceSplitEnumerator, this method can be
+- `handleSplitRequest`: If some Readers actively request SourceSplit from 
SourceSplitEnumerator, this method can be
   called SourceSplitEnumerator.Context.assignSplit to sends shards to the 
corresponding Reader.
-- ``snapshotState``: It is used for stream processing to periodically return 
the current state that needs to be saved.
+- `snapshotState`: It is used for stream processing to periodically return the 
current state that needs to be saved.
   If there is a state restoration, it will be called 
SeaTunnelSource.restoreEnumerator to constructs a
   SourceSplitEnumerator and restore the saved state to the 
SourceSplitEnumerator.
-- ``notifyCheckpointComplete``: It is used for subsequent processing after the 
state is successfully saved, and can be
+- `notifyCheckpointComplete`: It is used for subsequent processing after the 
state is successfully saved, and can be
   used to store the state or mark in third-party storage.
+- `handleSourceEvent` is used to handle events from the `SourceReader`. You 
can customize events, such as changes in the state of the `SourceReader`.
+- `close` is used to close the `SourceSplitEnumerator` and release resources.
+
+#### SourceSplitEnumerator.Context
+
+The `SourceSplitEnumerator.Context` is the context for the 
`SourceSplitEnumerator`, which interacts with SeaTunnel. It includes several 
key methods:
 
-#### **SourceSplit.java**
+- `currentParallelism`: Used to get the current task's parallelism.
+- `registeredReaders`: Used to get the list of currently registered 
`SourceReader`.
+- `assignSplit`: Used to assign splits to `SourceReader`.
+- `signalNoMoreSplits`: Used to notify a `SourceReader` that there are no more 
splits.
+- `sendEventToSourceReader`: Used to send events to `SourceReader`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for 
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for 
sending events to SeaTunnel.
+
+#### SourceSplit.java
 
 The interface used to save shards. Different shards need to define different 
splitIds. You can implement this interface
 to save the data that shards need to save, such as kafka's partition and 
topic, hbase's columnfamily and other
 information, which are used by SourceReader to determine Which part of the 
total data should be read.
 
-#### **SourceReader.java**
+#### SourceReader.java
 
 The interface that directly interacts with the data source, and the action of 
reading data from the data source is
 completed by implementing this interface.
 
-- ``pollNext``: It is the core of Reader. Through this interface, the process 
of reading the data of the data source and
+- `pollNext`: It is the core of Reader. Through this interface, the process of 
reading the data of the data source and
   returning it to SeaTunnel is realized. Whenever you are ready to pass data 
to SeaTunnel, you can call
-  the ``Collector.collect`` method in the parameter, which can be called an 
infinite number of times to complete a large
-  amount of data reading. But the data format supported at this stage can only 
be ``SeaTunnelRow``. Because our Source
+  the `Collector.collect` method in the parameter, which can be called an 
infinite number of times to complete a large
+  amount of data reading. But the data format supported at this stage can only 
be `SeaTunnelRow`. Because our Source
   is a stream-batch integration, the Connector has to decide when to end data 
reading in batch mode. For example, a
-  batch reads 100 pieces of data at a time. After the reading is completed, it 
needs ``pollNext`` to call in
-  to ``SourceReader.Context.signalNoMoreElementnotify`` SeaTunnel that there 
is no data to read . , then you can use
+  batch reads 100 pieces of data at a time. After the reading is completed, it 
needs `pollNext` to call in
+  to `SourceReader.Context.signalNoMoreElementnotify` SeaTunnel that there is 
no data to read . , then you can use
   these 100 pieces of data for batch processing. Stream processing does not 
have this requirement, so most SourceReaders
   with integrated stream batches will have the following code:
 
-```java
+``java
 if(Boundedness.BOUNDED.equals(context.getBoundedness())){
     // signal to the source that we have reached the end of the data.
     context.signalNoMoreElement();
     break;
     }
-```
+``
 
 It means that SeaTunnel will be notified only in batch mode.
 
-- ``addSplits``:  Used by the framework to assign SourceSplit to different 
SourceReaders, SourceReader should save the
+- `addSplits`:  Used by the framework to assign SourceSplit to different 
SourceReaders, SourceReader should save the
   obtained shards, and then pollNextread the corresponding shard data in it, 
but there may be times when the Reader does
   not read shards (maybe SourceSplit has not been generated or The current 
Reader is indeed not allocated), at this
   time, pollNextcorresponding processing should be made, such as continuing to 
wait.
-- ``handleNoMoreSplits``: When triggered, it indicates that there are no more 
shards, and the Connector Source is
+- `handleNoMoreSplits`: When triggered, it indicates that there are no more 
shards, and the Connector Source is
   required to optionally make corresponding feedback
-- ``snapshotStateIt``: is used for stream processing to periodically return 
the current state that needs to be saved,
+- `snapshotStateIt`: is used for stream processing to periodically return the 
current state that needs to be saved,
   that is, the fragmentation information (SeaTunnel saves the fragmentation 
information and state together to achieve
   dynamic allocation).
-- ``notifyCheckpointComplete``: Like ``notifyCheckpointAborted`` the name, it 
is a callback for different states of
+- `notifyCheckpointComplete`: Like `notifyCheckpointAborted` the name, it is a 
callback for different states of
   checkpoint.
 
-### **Sink**
+#### SourceReader.Context
+
+The `SourceReader.Context` is the context for the `SourceReader`, which 
interacts with SeaTunnel. It includes several key methods:
+
+- `getIndexOfSubtask`: Used to get the current Reader's subTask index.
+- `getBoundedness`: Used to get the current Reader's Boundedness, whether it 
is stream or batch.
+- `signalNoMoreElement`: Used to notify SeaTunnel that there are no more 
elements to read.
+- `sendSplitRequest`: Used to request splits from the `SourceSplitEnumerator` 
when the Reader has no splits.
+- `sendSourceEventToEnumerator`: Used to send events to the 
`SourceSplitEnumerator`.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for 
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for 
sending events to SeaTunnel.
+
+### Sink
 
-#### **SeaTunnelSink.java**
+#### TableSinkFactory.java
 
-It is used to define the way to write data to the destination, and obtain 
instances such as ``SinkWriter``
-and ``SinkCommitter`` through this interface. An important feature of the sink 
side is the processing of distributed
-transactions. SeaTunnel defines two different Committers: ``SinkCommitter`` 
used to process transactions for different
-subTasks ``SinkAggregatedCommitter``. Process transaction results for all 
nodes. Different Connector Sinks can be
-selected according to component properties, whether to implement only 
``SinkCommitter`` or ``SinkAggregatedCommitter``,
+- Used to create a factory class for the Sink, through which Sink instances 
are created using the `createSink` method.
+- `factoryIdentifier` is used to identify the name of the current Factory, 
which is also configured in the configuration file to distinguish different 
connectors.
+- `optionRule` is used to define the parameters supported by the current 
connector. You can use this method to define the logic of the parameters, such 
as which parameters are required, which parameters are optional, which 
parameters are mutually exclusive, etc. SeaTunnel will use `OptionRule` to 
verify the validity of the user's configuration. Please refer to the Option 
below.
+- Make sure to add the `@AutoService(Factory.class)` annotation to the 
`TableSinkFactory` class.
+
+#### SeaTunnelSink.java
+
+It is used to define the way to write data to the destination, and obtain 
instances such as `SinkWriter`
+and `SinkCommitter` through this interface. An important feature of the sink 
side is the processing of distributed
+transactions. SeaTunnel defines two different Committers: `SinkCommitter` used 
to process transactions for different
+subTasks `SinkAggregatedCommitter`. Process transaction results for all nodes. 
Different Connector Sinks can be
+selected according to component properties, whether to implement only 
`SinkCommitter` or `SinkAggregatedCommitter`,
 or both.
 
-#### **SinkWriter.java**
+- `createWriter` is used to create a `SinkWriter` instance. The `SinkWriter` 
is an interface that interacts with the data source, allowing data to be 
written to the data source through this interface.
+- `restoreWriter` is used to restore the `SinkWriter` to its previous state 
during state recovery. This method is called when the task is restored.
+- `getWriteCatalogTable` is used to get the `SeaTunnel CatalogTable` 
corresponding to the table written by the `Sink`. SeaTunnel will handle 
metrics-related logic based on this `CatalogTable`.
+
+#### SinkWriter.java
 
 It is used to directly interact with the output source, and provide the data 
obtained by SeaTunnel through the data
 source to the Writer for data writing.
 
-- ``write``: Responsible for transferring data to ``SinkWriter``, you can 
choose to write it directly, or write it after
-  buffering a certain amount of data. Currently, only the data type is 
supported ``SeaTunnelRow``.
-- ``prepareCommit``: Executed before commit, you can write data directly here, 
or you can implement phase one in 2pc,
-  and then implement phase two in ``SinkCommitter`` or 
``SinkAggregatedCommitter``. What this method returns is the
-  commit information, which will be provided ``SinkCommitter`` and 
``SinkAggregatedCommitter`` used for the next stage
+- `write`: Responsible for transferring data to `SinkWriter`, you can choose 
to write it directly, or write it after
+  buffering a certain amount of data. Currently, only the data type is 
supported `SeaTunnelRow`.
+- `prepareCommit`: Executed before commit, you can write data directly here, 
or you can implement phase one in 2pc,
+  and then implement phase two in `SinkCommitter` or 
`SinkAggregatedCommitter`. What this method returns is the
+  commit information, which will be provided `SinkCommitter` and 
`SinkAggregatedCommitter` used for the next stage
   of transaction processing.
+- `snapshotState` is used to periodically return the current state to be saved 
during stream processing. If there is a state recovery, 
`SeaTunnelSink.restoreWriter` will be called to construct the `SinkWriter` and 
restore the saved state to the `SinkWriter`.
+- `abortPrepare` is executed when `prepareCommit` fails, used to roll back the 
operations of `prepareCommit`.
+- `close` is used to close the `SinkWriter` and release resources.
 
-#### **SinkCommitter.java**
+##### SinkWriter.Context
 
-It is used to process ``SinkWriter.prepareCommit`` the returned data 
information, including transaction information that
-needs to be submitted.
+The `Context` is the context for the `SinkWriter`, which interacts with 
SeaTunnel. It includes several key methods:
 
-#### **SinkAggregatedCommitter.java**
+- `getIndexOfSubtask`: Used to get the current Writer's subTask index.
+- `getNumberOfParallelSubtasks`: Used to get the current task's parallelism.
+- `getMetricsContext`: Used to get the current task's `MetricsContext` for 
recording metrics.
+- `getEventListener`: Used to get the current task's `EventListener` for 
sending events to SeaTunnel.
 
-It is used to process ``SinkWriter.prepareCommit`` the returned data 
information, including transaction information that
-needs to be submitted, etc., but it will be processed together on a single 
node, which can avoid the problem of
-inconsistency of the state caused by the failure of the second part of the 
stage.
+#### SinkCommitter.java
 
-- ``combine``: Used ``SinkWriter.prepareCommit`` to aggregate the returned 
transaction information, and then generate
-  aggregated transaction information.
+Used to process the data information returned by `SinkWriter.prepareCommit`, 
including the transaction information that needs to be submitted. Unlike 
`SinkAggregatedCommitter`, `SinkCommitter` is executed on each node. We 
recommend using `SinkAggregatedCommitter`.
 
-#### **Implement SinkCommitter or SinkAggregatedCommitter?**
+- `commit`: Used to submit the transaction information returned by 
`SinkWriter.prepareCommit`. If it fails, idempotency must be implemented to 
ensure that the engine retry can work normally.
+- `abort`: Used to roll back the operations of `SinkWriter.prepareCommit`. If 
it fails, idempotency must be implemented to ensure that the engine retry can 
work normally.
 
-In the current version, it is recommended to implement 
``SinkAggregatedCommitter`` as the first choice, which can
-provide strong consistency guarantee in Flink/Spark. At the same time, commit 
should be idempotent, and save engine
-retry can work normally.
+#### SinkAggregatedCommitter.java
 
-### TableSourceFactory and TableSinkFactory
+Used to process the data information returned by `SinkWriter.prepareCommit`, 
including the transaction information that needs to be submitted. However, it 
will be processed together on a single node, which can avoid the problem of 
inconsistency caused by the failure of the second part of the stage.
 
-In order to automatically create the Source Connector and Sink Connector and 
Transform Connector, we need the connector to return the parameters needed to 
create them and the verification rules for each parameter. For Source Connector 
and Sink Connector, we define `TableSourceFactory` and `TableSinkFactory`
-supported by the current connector and the required parameters. We define 
TableSourceFactory and TableSinkFactory,
-It is recommended to put it in the same directory as the implementation class 
of SeaTunnelSource or SeaTunnelSink for easy searching.
+- `init`: Used to initialize the `SinkAggregatedCommitter`. You can initialize 
some resources for the connector here, such as connecting to a database or 
initializing some states.
+- `restoreCommit`: Used to restore the `SinkAggregatedCommitter` to its 
previous state during state recovery. This method is called when the task is 
restored, and we should retry committing the unfinished transactions in this 
method.
+- `commit`: Used to submit the transaction information returned by 
`SinkWriter.prepareCommit`. If it fails, idempotency must be implemented to 
ensure that the engine retry can work normally.
+- `combine`: Used to aggregate the transaction information returned by 
`SinkWriter.prepareCommit` and then generate aggregated transaction information.
+- `abort`: Used to roll back the operations of `SinkWriter.prepareCommit`. If 
it fails, idempotency must be implemented to ensure that the engine retry can 
work normally.
+- `close`: Used to close the `SinkAggregatedCommitter` and release resources.
 
-- `factoryIdentifier` is used to indicate the name of the current Factory. 
This value should be the same as the
-  value returned by `getPluginName`, so that if Factory is used to create 
Source/Sink in the future,
-  A seamless switch can be achieved.
-- `createSink` and `createSource` are the methods for creating Source and Sink 
respectively.
-- `optionRule` returns the parameter logic, which is used to indicate which 
parameters of our connector are supported,
-  which parameters are required, which parameters are optional, and which 
parameters are exclusive, which parameters are bundledRequired.
-  This method will be used when we visually create the connector logic, and it 
will also be used to generate a complete parameter
-  object according to the parameters configured by the user, and then the 
connector developer does not need to judge whether the parameters
-  exist one by one in the Config, and use it directly That's it.
-  You can refer to existing implementations, such as 
`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`.
-  There is support for configuring Schema for many Sources, so a common Option 
is used.
-  If you need a schema, you can refer to 
`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`.
+#### Implement SinkCommitter or SinkAggregatedCommitter?
 
-Don't forget to add `@AutoService(Factory.class)` to the class. This Factory 
is the parent class of TableSourceFactory and TableSinkFactory.
+In the current version, it is recommended to implement 
`SinkAggregatedCommitter` as the first choice, which can
+provide strong consistency guarantee in Flink/Spark. At the same time, commit 
should be idempotent, and save engine
+retry can work normally.
 
-### **Options**
+### Options
 
 When we implement TableSourceFactory and TableSinkFactory, the corresponding 
Option will be created.
 Each Option corresponds to a configuration, but different configurations will 
have different types.
@@ -238,7 +270,14 @@ In most cases, the default is empty. `description` is used 
to indicate the descr
 This parameter is optional. It is recommended to be consistent with the 
documentation. For specific examples,
 please refer to 
`org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertSinkFactory`.
 
-## **Result**
+In `TableSourceFactory` and `TableSinkFactory`, the `optionRule` method 
returns the parameter logic, 
+which defines which parameters are supported by our connector, which 
parameters are required, which parameters are optional, 
+which parameters are mutually exclusive, and which parameters are bundled 
required. This method will be used when we visually create the connector logic, 
+and it will also be used to generate a complete parameter object based on the 
user's configured parameters, so that connector developers do not need to check 
each parameter in the config individually and can use it directly. 
+You can refer to existing implementations, such as 
`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`.
 For many sources that support schema configuration, a common option is used, 
and if a schema is needed, 
+you can refer to 
`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`.
+
+## Implement
 
-All Connector implementations should be under the ``seatunnel-connectors-v2``, 
and the examples that can be referred to
+All Connector implementations should be under the `seatunnel-connectors-v2`, 
and the examples that can be referred to
 at this stage are under this module.
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/README.zh.md 
b/seatunnel-connectors-v2/README.zh.md
index b1ed13020f..741ad29b93 100644
--- a/seatunnel-connectors-v2/README.zh.md
+++ b/seatunnel-connectors-v2/README.zh.md
@@ -13,29 +13,31 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
 - ../`seatunnel-translation`                                           
connector-v2的翻译层
 - ../`seatunnel-transform-v2`                                          
transform-v2代码实现
 - ../seatunnel-e2e/`seatunnel-connector-v2-e2e`                        
connector-v2端到端测试
+- ../seatunnel-examples/`seatunnel-engine-examples`                    
seatunnel connector-v2的Zeta引擎local运行的实例
 - ../seatunnel-examples/`seatunnel-flink-connector-v2-example`         
seatunnel connector-v2的flink local运行的实例
 - ../seatunnel-examples/`seatunnel-spark-connector-v2-example`         
seatunnel connector-v2的spark local运行的实例
 
 ### Example
 
-我们已经在`seatunnel-examples`
-准备了两个本地可执行的案例程序,其中一个是`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`
-,它运行在flink引擎上。另外一个是`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`
-,它运行在spark引擎上。你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。
+我们已经在`seatunnel-examples`准备了三个本地可执行的案例程序
+- 
`seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineLocalExample.java`,它运行在Zeta引擎上
+- 
`seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java`,它运行在flink引擎上
+- 
`seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/java/org/apache/seatunnel/example/spark/v2/SeaTunnelApiExample.java`,它运行在spark引擎上
+你可以通过调试这些例子帮你更好的理解程序运行逻辑。使用的配置文件保存在`resources/examples`文件夹里。如果你想增加自己的connectors,你需要按照下面的步骤。
 
-1. 
在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`添加connector依赖的groupId,
 artifactId 和
-   
version.(或者当你想在spark引擎运行时在`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖)
-2. 
如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml(
-   
或者在seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml)中,并且修改scope为compile.
-3. 在resources/examples下添加任务配置文件.
-4. 在`SeaTunnelApiExample` main方法中配置文件.
-5. 运行main方法即可.
+以Zeta引擎为例,你可以通过以下步骤来添加一个新的connector到example中:
+1. 
在`seatunnel-examples/seatunnel-engine-examples/pom.xml`添加connector依赖的groupId, 
artifactId 和
+   
version.(或者当你想在flink或spark引擎运行时请在`seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml`或`seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml`添加依赖,以下同理)。
+2. 
如果你的connector中存在scope为test或provided的依赖,将这些依赖添加到`seatunnel-examples/seatunnel-engine-examples/pom.xml`并且修改scope为compile。
+3. 在resources/examples下添加任务配置文件。
+4. 在`SeaTunnelEngineLocalExample.java` main方法中配置文件地址。
+5. 运行main方法即可。
 
-### 创建新的seatunnel v2 connector
+### 创建新的SeaTunnel V2 Connector
 
 1.在`seatunnel-connectors-v2`目录下新建一个module,命名为connector-{连接器名}.
 
-2.pom文件可以参考已有连接器的pom文件,并在父model的pom文件中添加当前子model.
+2.pom文件可以参考已有连接器的pom文件,并在`seatunnel-connectors-v2/pom.xml`中添加当前子model.
 
 3.新建两个package分别对应source和sink
 
@@ -54,12 +56,12 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
 
 ### 启动类
 
-和老的启动类分开,我们创建了两个新的启动类工程,分别是`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`.
-可以在这里找到如何将配置文件解析为可以执行的Flink/Spark流程。
+我们创建了三个启动类工程,分别是`seatunnel-core/seatunnel-starter`,`seatunnel-core/seatunnel-flink-starter`和`seatunnel-core/seatunnel-spark-starter`。
+可以在这里找到如何将配置文件解析为可以执行的Zeta/Flink/Spark流程。
 
 ### SeaTunnel API
 
-新建了一个`seatunnel-api`(不是`seatunnel-apis`)模块,用于存放SeaTunnel API定义的新接口, 
开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel Connector
+`seatunnel-api`模块,用于存放SeaTunnel API定义的新接口, 开发者通过对这些接口进行实现,就可以完成支持多引擎的SeaTunnel 
Connector
 
 ### 翻译层
 
@@ -72,11 +74,18 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
 
 ### Source
 
+#### TableSourceFactory.java
+
+- 用于创建Source的工厂类,通过该类来创建Source实例,通过`createSource`方法来创建Source实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+- `optionRule` 
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
+- 请确保在`TableSourceFactory`添加`@AutoService(Factory.class)`注解。
+
 #### SeaTunnelSource.java
 
 - SeaTunnel的Source采用流批一体的设计,通过`getBoundedness`
   
来决定当前Source是流Source还是批Source,所以可以通过动态配置的方式(参考default方法)来指定一个Source既可以为流,也可以为批。
-- 
`getRowTypeInfo`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者运行用户通过config配置来自定义schema,推荐后者。
+- 
`getProducedCatalogTables`来得到数据的schema,connector可以选择硬编码来实现固定的schema,或者实现通过用户定义的config配置来自定义schema,推荐后者。
 - SeaTunnelSource是执行在driver端的类,通过该类,来获取SourceReader,SplitEnumerator等对象以及序列化器。
 - 目前SeaTunnelSource支持的生产的数据类型必须是SeaTunnelRow类型。
 
@@ -84,6 +93,7 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
 
 通过该枚举器来获取数据读取的分片(SourceSplit)情况,不同的分片可能会分配给不同的SourceReader来读取数据。包含几个关键方法:
 
+- `open`用于初始化SourceSplitEnumerator,可以在这里初始化一些连接器的资源,比如连接数据库,初始化一些状态等。
 - 
`run`用于执行产生SourceSplit并调用`SourceSplitEnumerator.Context.assignSplit`来将分片分发给SourceReader。
 - 
`addSplitsBack`用于处理SourceReader异常导致SourceSplit无法正常处理或者重启时,需要SourceSplitEnumerator对这些Split进行重新分发。
 - `registerReader`
@@ -93,6 +103,20 @@ SeaTunnel为与计算引擎进行解耦,设计了新的连接器API,通过
 - 
`snapshotState`用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSource.restoreEnumerator`
   来构造SourceSplitEnumerator,将保存的状态恢复给SourceSplitEnumerator。
 - `notifyCheckpointComplete`用于状态保存成功后的后续处理,可以用于将状态或者标记存入第三方存储。
+- `handleSourceEvent`用于处理SourceReader的事件,可以自定义事件,比如SourceReader的状态变化等。
+- `close`用于关闭SourceSplitEnumerator,释放资源。
+
+##### SourceSplitEnumerator.Context
+
+Context是SourceSplitEnumerator的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `currentParallelism`用于获取当前任务的并行度。
+- `registeredReaders`用于获取当前已经注册的SourceReader列表。
+- `assignSplit`用于将分片分发给SourceReader。
+- `signalNoMoreSplits`用于通知某个Reader没有更多的分片了。
+- `sendEventToSourceReader`用于发送事件给SourceReader。
+- `getMetricsContext`用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener`用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
 
 #### SourceSplit.java
 
@@ -124,14 +148,37 @@ if (Boundedness.BOUNDED.equals(context.getBoundedness())) 
{
 - `snapshotState`用于流处理定时返回需要保存的当前状态,也就是分片信息(SeaTunnel将分片信息和状态保存在一起,实现动态分配)。
 - 
`notifyCheckpointComplete`和`notifyCheckpointAborted`和名字一样,是checkpoint不同状态下的回调。
 
+##### SourceReader.Context
+
+Context是SourceReader的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `getIndexOfSubtask`用于获取当前Reader的subTask索引。
+- `getBoundedness`用于获取当前Reader的Boundedness,是流还是批。
+- `signalNoMoreElement`用于通知SeaTunnel没有数据读取了。
+- `sendSplitRequest`用于向SourceSplitEnumerator请求分片,用于在Reader没有分片的时候主动请求分片。
+- `sendSourceEventToEnumerator`用于发送事件给SourceSplitEnumerator。
+- `getMetricsContext`用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener`用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
+
 ### Sink
 
+#### TableSinkFactory.java
+
+- 用于创建Sink的工厂类,通过该类来创建Sink实例,通过`createSink`方法来创建Sink实例。
+- `factoryIdentifier`用于标识当前Factory的名称,也是在配置文件中配置的名称,用于区分不同的连接器。
+- `optionRule` 
用于定义当前连接器支持的参数,可以通过该方法来定义参数的逻辑,比如哪些参数是必须的,哪些参数是可选的,哪些参数是互斥的等等,SeaTunnel会通过OptionRule来校验用户的配置是否合法。请参考下方的Option。
+- 请确保在`TableSinkFactory`添加`@AutoService(Factory.class)`注解。
+
 #### SeaTunnelSink.java
 
 
用于定义数据写入目标端的方式,通过该接口来实现获取SinkWriter和SinkCommitter等实例。Sink端有一个重要特性就是分布式事务的处理,SeaTunnel定义了两种不同的Committer:`SinkCommitter`
 
用于处理针对不同的subTask进行事务的处理,每个subTask处理各自的事务,然后成功后再由`SinkAggregatedCommitter`单线程的处理所有节点的事务结果。不同的Connector
 Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`SinkAggregatedCommitter`,还是都实现。
 
+- `createWriter`用于创建SinkWriter实例,SinkWriter是和数据源进行交互的接口,通过该接口来将数据写入到数据源。
+- `restoreWriter`用于恢复SinkWriter,用于在恢复状态时,将SinkWriter恢复到之前的状态,会在任务恢复时调用。
+- `getWriteCatalogTable`用于获取Sink写入表对应的SeaTunnel 
CatalogTable,SeaTunnel会根据这个CatalogTable来处理指标相关的逻辑。
+
 #### SinkWriter.java
 
 用于直接和输出源进行交互,将SeaTunnel通过数据源取得的数据提供给Writer进行数据写入。
@@ -139,35 +186,41 @@ Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`
 - `write` 负责将数据传入SinkWriter,可以选择直接写入,或者缓存到一定数据后再写入,目前数据类型只支持`SeaTunnelRow`。
 - `prepareCommit` 
在commit之前执行,可以在这直接写入数据,也可以实现2pc中的阶段一,然后在`SinkCommitter`或`SinkAggregatedCommitter`
   
中实现阶段二。该方法返回的就是commit信息,将会提供给`SinkCommitter`和`SinkAggregatedCommitter`用于下一阶段事务处理。
+- `snapshotState` 
用于流处理定时返回需要保存的当前状态,如果有状态恢复时,会调用`SeaTunnelSink.restoreWriter`来构造SinkWriter,将保存的状态恢复给SinkWriter。
+- `abortPrepare` 在prepareCommit失败时执行,用于回滚prepareCommit的操作。
+- `close` 用于关闭SinkWriter,释放资源。
+
+##### SinkWriter.Context
+
+Context是SinkWriter的上下文,通过该上下文来和SeaTunnel进行交互,包含几个关键方法:
+
+- `getIndexOfSubtask` 用于获取当前Writer的subTask索引。
+- `getNumberOfParallelSubtasks` 用于获取当前任务的并行度。
+- `getMetricsContext` 用于获取当前任务的MetricsContext,用于记录一些指标。
+- `getEventListener` 用于获取当前任务的EventListener,用于发送事件到SeaTunnel。
 
 #### SinkCommitter.java
 
-用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等。
+用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等。和`SinkAggregatedCommitter`不同的是,`SinkCommitter`是在每个节点上执行的,我们更推荐使用`SinkAggregatedCommitter`。
+
+- `commit` 用于提交`SinkWriter.prepareCommit`返回的事务信息,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
+- `abort` 用于回滚`SinkWriter.prepareCommit`的操作,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
 
 #### SinkAggregatedCommitter.java
 
 
用于处理`SinkWriter.prepareCommit`返回的数据信息,包含需要提交的事务信息等,但是会在单个节点一起处理,这样可以避免阶段二部分失败导致状态不一致的问题。
 
+- `init` 用于初始化`SinkAggregatedCommitter`,可以在这里初始化一些连接器的资源,比如连接数据库,初始化一些状态等。
+- `restoreCommit` 
用于恢复`SinkAggregatedCommitter`,用于在恢复状态时,将`SinkAggregatedCommitter`恢复到之前的状态,会在任务恢复时调用,我们应该在这个方法里重新尝试提交上次未完成的事务。
+- `commit` 用于提交`SinkWriter.prepareCommit`返回的事务信息,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
 - `combine` 用于将`SinkWriter.prepareCommit`返回的事务信息进行聚合,然后生成聚合的事务信息。
+- `abort` 用于回滚`SinkWriter.prepareCommit`的操作,如果失败则需要实现幂等性,保存引擎重试能够正常运作。
+- `close` 用于关闭`SinkAggregatedCommitter`,释放资源。
 
 #### 我应该实现SinkCommitter还是SinkAggregatedCommitter?
 
 
当前版本推荐将实现SinkAggregatedCommitter作为首选,可以在Flink/Spark中提供较强的一致性保证,同时commit应该要实现幂等性,保存引擎重试能够正常运作。
 
-### TableSourceFactory 和 TableSinkFactory
-
-为了实现自动化的创建Source或者Sink,我们需要连接器能够声明并返回创建他们所需要的参数列表和每个参数的校验规则。为了实现这个目标,我们定义了TableSourceFactory和TableSinkFactory,
-建议将其放在和SeaTunnelSource或SeaTunnelSink实现类同一目录下,方便寻找。
-
-- `factoryIdentifier` 
用于表明当前Factory的名称,这个值应该和`getPluginName`返回的值一致,这样后续如果使用Factory来创建Source/Sink,就能实现无缝切换。
-- `createSink` 和 `createSource` 分别是创建Source和Sink的方法。
-- `optionRule` 
返回的是参数逻辑,用于表示我们的连接器参数哪些支持,哪些参数是必须(required)的,哪些参数是可选(optional)的,哪些参数是互斥(exclusive)的,哪些参数是绑定(bundledRequired)的。
-  
这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
-  
可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
-  
需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`。
-
-别忘记添加`@AutoService(Factory.class)` 到类上面。这个Factory即TableSourceFactory 和 
TableSinkFactory的父类。
-
 ### Option
 
 当我们实现TableSourceFactory 和 
TableSinkFactory时,会创建对应的Option,每一个Option对应的就是一个配置,但是不同的配置会有不同的类型,普通类型直接调用对应的方法即可创建。
@@ -175,6 +228,11 @@ Sink可以根据组件属性进行选择,到底是只实现`SinkCommitter`或`
 
`OptionMark`有两个参数,`name`用于声明字段对应的参数名称,如果为空的话,我们会默认将java对应的小驼峰转换成下划线进行表达,如:`myUserPassword`->`my_user_password`。
 
在大多数情况下,默认为空即可。`description`用于表示当前参数的描述,这个参数是可选的,建议和文档上的保持一致。具体例子可以参考`org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertSinkFactory`。
 
+TableSourceFactory 和 TableSinkFactory 中的`optionRule` 
返回的是参数逻辑,用于表示我们的连接器参数哪些支持,哪些参数是必须(required)的,哪些参数是可选(optional)的,哪些参数是互斥(exclusive)的,哪些参数是绑定(bundledRequired)的。
+这个方法会在我们可视化创建连接器逻辑的时候用到,同时也会用于根据用户配置的参数生成完整的参数对象,然后连接器开发者就不用在Config里面一个个判断参数是否存在,直接使用即可。
+可以参考现有的实现,比如`org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSourceFactory`。针对很多Source都有支持配置Schema,所以采用了通用的Option,
+需要Schema则可以引用`org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA`。
+
 ## 实现
 
 现阶段所有的连接器实现及可参考的示例都在seatunnel-connectors-v2下,用户可自行查阅参考。
\ No newline at end of file

Reply via email to