This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e8793bb9fc [Feature][Doc] Add RocketMq connector (#5361)
e8793bb9fc is described below

commit e8793bb9fc08d9728ca28e56d47092fc980c2d30
Author: ZhilinLi <[email protected]>
AuthorDate: Tue Oct 10 15:14:58 2023 +0800

    [Feature][Doc] Add RocketMq connector (#5361)
    
    * [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
    
    * [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
    
    * [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
    
    * fix
    
    * add case
---
 docs/en/connector-v2/sink/RocketMQ.md   | 197 ++++++++++++++++++-----
 docs/en/connector-v2/source/RocketMQ.md | 271 ++++++++++++++++++++------------
 2 files changed, 333 insertions(+), 135 deletions(-)

diff --git a/docs/en/connector-v2/sink/RocketMQ.md 
b/docs/en/connector-v2/sink/RocketMQ.md
index 7031920214..3726f76c63 100644
--- a/docs/en/connector-v2/sink/RocketMQ.md
+++ b/docs/en/connector-v2/sink/RocketMQ.md
@@ -1,44 +1,42 @@
 # RocketMQ
 
 > RocketMQ sink connector
->
-  ## Description
 
-Write Rows to a Apache RocketMQ topic.
+## Support Apache RocketMQ Version
 
-## Key features
+- 4.9.0 (Or a newer version, for reference)
 
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+## Support These Engines
 
-By default, we will use 2pc to guarantee the message is sent to RocketMQ 
exactly once.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
-## Options
-
-|         name         |  type   | required |      default value       |
-|----------------------|---------|----------|--------------------------|
-| topic                | string  | yes      | -                        |
-| name.srv.addr        | string  | yes      | -                        |
-| acl.enabled          | Boolean | no       | false                    |
-| access.key           | String  | no       |                          |
-| secret.key           | String  | no       |                          |
-| producer.group       | String  | no       | SeaTunnel-producer-Group |
-| semantic             | string  | no       | NON                      |
-| partition.key.fields | array   | no       | -                        |
-| format               | String  | no       | json                     |
-| field.delimiter      | String  | no       | ,                        |
-| common-options       | config  | no       | -                        |
-
-### topic [string]
+## Key features
 
-`RocketMQ topic` name.
+- [x] [exactly-once](../../concept/connector-v2-features.md)
 
-### name.srv.addr [string]
+By default, we will use 2pc to guarantee the message is sent to RocketMQ 
exactly once.
 
-`RocketMQ` name server cluster address.
+## Description
 
-### semantic [string]
+Write Rows to a Apache RocketMQ topic.
 
-Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+## Sink Options
+
+|         Name         |  Type   | Required |         Default          |       
                                                                      
Description                                                                     
        |
+|----------------------|---------|----------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                | string  | yes      | -                        | 
`RocketMQ topic` name.                                                          
                                                                                
    |
+| name.srv.addr        | string  | yes      | -                        | 
`RocketMQ` name server cluster address.                                         
                                                                                
    |
+| acl.enabled          | Boolean | no       | false                    | false 
                                                                                
                                                                              |
+| access.key           | String  | no       |                          | When 
ACL_ENABLED is true, access key cannot be empty                                 
                                                                               |
+| secret.key           | String  | no       |                          | When 
ACL_ENABLED is true, secret key cannot be empty                                 
                                                                               |
+| producer.group       | String  | no       | SeaTunnel-producer-Group | 
SeaTunnel-producer-Group                                                        
                                                                                
    |
+| partition.key.fields | array   | no       | -                        | -     
                                                                                
                                                                              |
+| format               | String  | no       | json                     | Data 
format. The default format is json. Optional text format. The default field 
separator is ",".If you customize the delimiter, add the "field_delimiter" 
option. |
+| field.delimiter      | String  | no       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
    |
+| producer.send.sync   | Boolean | no       | false                    | If 
true, the message will be sync sent.                                            
                                                                                
 |
+| common-options       | config  | no       | -                        | Sink 
plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.                                        
                        |
 
 ### partition.key.fields [array]
 
@@ -55,27 +53,150 @@ Upstream data is the following:
 
 If name is set as the key, then the hash value of the name column will 
determine which partition the message is sent to.
 
-### format
+## Task Example
+
+### Fake to Rocketmq Simple
+
+> The data is randomly generated and asynchronously sent to the test topic
+
+```hocon
+env {
+  execution.parallelism = 1
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topic = "test_topic"
+  }
+}
+
+```
 
-Data format. The default format is json. Optional text format. The default 
field separator is ",".
-If you customize the delimiter, add the "field_delimiter" option.
+### Rocketmq To Rocketmq Simple
 
-### field_delimiter
+> Consuming Rocketmq writes to c_int field Hash number of partitions written 
to different partitions This is the default asynchronous way to write
 
-Customize the field delimiter for data format.
+```hocon
+env {
+  execution.parallelism = 1
+}
 
-### common options [config]
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topics = "test_topic"
+    result_table_name = "rocketmq_table"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
 
-Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
+sink {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topic = "test_topic_sink"
+    partition.key.fields = ["c_int"]
+  }
+}
+```
+
+### Timestamp consumption write Simple
 
-## Examples
+> This is a stream consumption specified time stamp consumption, when there 
are new partitions added the program will refresh the perception and 
consumption at intervals, and write to another topic type
 
 ```hocon
+
+env {
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topics = "test_topic"
+    result_table_name = "rocketmq_table"
+    start.mode = "CONSUME_FROM_FIRST_OFFSET"
+    batch.size = "400"
+    consumer.group = "test_topic_group"
+    format = "json"
+    format = json
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
 sink {
   Rocketmq {
     name.srv.addr = "localhost:9876"
-    topic = "test-topic-003"
-    partition.key.fields = ["name"]
+    topic = "test_topic"
+    partition.key.fields = ["c_int"]
+    producer.send.sync = true
   }
 }
 ```
diff --git a/docs/en/connector-v2/source/RocketMQ.md 
b/docs/en/connector-v2/source/RocketMQ.md
index fd209ce70b..1a3f00f436 100644
--- a/docs/en/connector-v2/source/RocketMQ.md
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -2,9 +2,15 @@
 
 > RocketMQ source connector
 
-## Description
+## Support Apache RocketMQ Version
 
-Source connector for Apache RocketMQ.
+- 4.9.0 (Or a newer version, for reference)
+
+## Support These Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
 
 ## Key features
 
@@ -15,127 +21,198 @@ Source connector for Apache RocketMQ.
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 
-## Options
-
-|                name                 |  type   | required |       default 
value        |
-|-------------------------------------|---------|----------|----------------------------|
-| topics                              | String  | yes      | -                 
         |
-| name.srv.addr                       | String  | yes      | -                 
         |
-| acl.enabled                         | Boolean | no       | false             
         |
-| access.key                          | String  | no       |                   
         |
-| secret.key                          | String  | no       |                   
         |
-| batch.size                          | int     | no       | 100               
         |
-| consumer.group                      | String  | no       | 
SeaTunnel-Consumer-Group   |
-| commit.on.checkpoint                | Boolean | no       | true              
         |
-| schema                              |         | no       | -                 
         |
-| format                              | String  | no       | json              
         |
-| field.delimiter                     | String  | no       | ,                 
         |
-| start.mode                          | String  | no       | 
CONSUME_FROM_GROUP_OFFSETS |
-| start.mode.offsets                  |         | no       |                   
         |
-| start.mode.timestamp                | Long    | no       |                   
         |
-| partition.discovery.interval.millis | long    | no       | -1                
         |
-| common-options                      | config  | no       | -                 
         |
-
-### topics [string]
-
-`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for 
example: `"tpc1,tpc2"`.
-
-### name.srv.addr [string]
-
-`RocketMQ` name server cluster address.
-
-### consumer.group [string]
-
-`RocketMQ consumer group id`, used to distinguish different consumer groups.
-
-### acl.enabled [boolean]
-
-If true, access control is enabled, and access key and secret key need to be 
configured.
-
-### access.key [string]
-
-When ACL_ENABLED is true, access key cannot be empty.
-
-### secret.key [string]
-
-When ACL_ENABLED is true, secret key cannot be empty.
-
-### batch.size [int]
-
-`RocketMQ` consumer pull batch size
-
-### commit.on.checkpoint [boolean]
-
-If true the consumer's offset will be periodically committed in the background.
-
-## partition.discovery.interval.millis [long]
+## Description
 
-The interval for dynamically discovering topics and partitions.
+Source connector for Apache RocketMQ.
 
-### schema
+## Source Options
+
+|                Name                 |  Type   | Required |          Default  
         |                                                                      
                              Description                                       
                                                              |
+|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topics                              | String  | yes      | -                 
         | `RocketMQ topic` name. If there are multiple `topics`, use `,` to 
split, for example: `"tpc1,tpc2"`.                                              
                                                                 |
+| name.srv.addr                       | String  | yes      | -                 
         | `RocketMQ` name server cluster address.                              
                                                                                
                                                              |
+| acl.enabled                         | Boolean | no       | false             
         | If true, access control is enabled, and access key and secret key 
need to be configured.                                                          
                                                                 |
+| access.key                          | String  | no       |                   
         |                                                                      
                                                                                
                                                              |
+| secret.key                          | String  | no       |                   
         | When ACL_ENABLED is true, secret key cannot be empty.                
                                                                                
                                                              |
+| batch.size                          | int     | no       | 100               
         | `RocketMQ` consumer pull batch size                                  
                                                                                
                                                              |
+| consumer.group                      | String  | no       | 
SeaTunnel-Consumer-Group   | `RocketMQ consumer group id`, used to distinguish 
different consumer groups.                                                      
                                                                                
 |
+| commit.on.checkpoint                | Boolean | no       | true              
         | If true the consumer's offset will be periodically committed in the 
background.                                                                     
                                                               |
+| schema                              |         | no       | -                 
         | The structure of the data, including field names and field types.    
                                                                                
                                                              |
+| format                              | String  | no       | json              
         | Data format. The default format is json. Optional text format. The 
default field separator is ",".If you customize the delimiter, add the 
"field.delimiter" option.                                                |
+| field.delimiter                     | String  | no       | ,                 
         | Customize the field delimiter for data format                        
                                                                                
                                                              |
+| start.mode                          | String  | no       | 
CONSUME_FROM_GROUP_OFFSETS | The initial consumption pattern of consumers,there 
are several types: 
[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
 |
+| start.mode.offsets                  |         | no       |                   
         |                                                                      
                                                                                
                                                              |
+| start.mode.timestamp                | Long    | no       |                   
         | The time required for consumption mode to be 
"CONSUME_FROM_TIMESTAMP".                                                       
                                                                                
      |
+| partition.discovery.interval.millis | long    | no       | -1                
         | The interval for dynamically discovering topics and partitions.      
                                                                                
                                                              |
+| common-options                      | config  | no       | -                 
         | Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.                                        
                                                                   |
+
+### start.mode.offsets
 
-The structure of the data, including field names and field types.
+The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
 
-## format
+for example:
 
-Data format. The default format is json. Optional text format. The default 
field separator is ", ".
-If you customize the delimiter, add the "field.delimiter" option.
+```hocon
+start.mode.offsets = {
+  topic1-0 = 70
+  topic1-1 = 10
+  topic1-2 = 10
+}
+```
 
-## field.delimiter
+## Task Example
 
-Customize the field delimiter for data format.
+### Simple:
 
-## start.mode
+> Consumer reads Rocketmq data and prints it to the console type
 
-The initial consumption pattern of consumers,there are several types:
-[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
-,[CONSUME_FROM_SPECIFIC_OFFSETS]
+```hocon
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
 
-## start.mode.timestamp
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_json"
+    result_table_name = "rocketmq_table"
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
 
-The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
 
-## start.mode.offsets
+sink {
+  Console {
+  }
+}
+```
 
-The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
+### Specified format consumption Simple:
 
-for example:
+> When I consume the topic data in json format parsing and pulling the number 
of bars each time is 400, the consumption starts from the original location
 
 ```hocon
-start.mode.offsets = {
-         topic1-0 = 70
-         topic1-1 = 10
-         topic1-2 = 10
-      }
-```
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
 
-### common-options [config]
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topics = "test_topic"
+    result_table_name = "rocketmq_table"
+    start.mode = "CONSUME_FROM_FIRST_OFFSET"
+    batch.size = "400"
+    consumer.group = "test_topic_group"
+    format = "json"
+    format = json
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
+sink {
+  Console {
+  }
+}
+```
 
-## Example
+### Specified timestamp Simple:
 
-### Simple
+> This is to specify a time to consume, and I dynamically sense the existence 
of a new partition every 1000 milliseconds to pull the consumption
 
 ```hocon
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
 source {
   Rocketmq {
     name.srv.addr = "localhost:9876"
-    topics = "test-topic-002"
-    consumer.group = "consumer-group"
-    parallelism = 2
-    batch.size = 20
+    topics = "test_topic"
+    partition.discovery.interval.millis = "1000"
+    start.mode.timestamp="1694508382000"
+    consumer.group="test_topic_group"
+    format="json"
+    format = json
     schema = {
-       fields {
-            age = int
-            name = string
-       }
-     }
-    start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
-    start.mode.offsets = {
-                test-topic-002-0 = 20
-             }
-            
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+  Console {
   }
 }
 ```

Reply via email to