This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e8793bb9fc [Feature][Doc] Add RocketMq connector (#5361)
e8793bb9fc is described below
commit e8793bb9fc08d9728ca28e56d47092fc980c2d30
Author: ZhilinLi <[email protected]>
AuthorDate: Tue Oct 10 15:14:58 2023 +0800
[Feature][Doc] Add RocketMq connector (#5361)
* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
* [Feature][doc][Connector-V2][RocketMq] Add RocketMq connector
* fix
* add case
---
docs/en/connector-v2/sink/RocketMQ.md | 197 ++++++++++++++++++-----
docs/en/connector-v2/source/RocketMQ.md | 271 ++++++++++++++++++++------------
2 files changed, 333 insertions(+), 135 deletions(-)
diff --git a/docs/en/connector-v2/sink/RocketMQ.md
b/docs/en/connector-v2/sink/RocketMQ.md
index 7031920214..3726f76c63 100644
--- a/docs/en/connector-v2/sink/RocketMQ.md
+++ b/docs/en/connector-v2/sink/RocketMQ.md
@@ -1,44 +1,42 @@
# RocketMQ
> RocketMQ sink connector
->
- ## Description
-Write Rows to a Apache RocketMQ topic.
+## Support Apache RocketMQ Version
-## Key features
+- 4.9.0 (Or a newer version, for reference)
-- [x] [exactly-once](../../concept/connector-v2-features.md)
+## Support These Engines
-By default, we will use 2pc to guarantee the message is sent to RocketMQ
exactly once.
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
-## Options
-
-| name | type | required | default value |
-|----------------------|---------|----------|--------------------------|
-| topic | string | yes | - |
-| name.srv.addr | string | yes | - |
-| acl.enabled | Boolean | no | false |
-| access.key | String | no | |
-| secret.key | String | no | |
-| producer.group | String | no | SeaTunnel-producer-Group |
-| semantic | string | no | NON |
-| partition.key.fields | array | no | - |
-| format | String | no | json |
-| field.delimiter | String | no | , |
-| common-options | config | no | - |
-
-### topic [string]
+## Key features
-`RocketMQ topic` name.
+- [x] [exactly-once](../../concept/connector-v2-features.md)
-### name.srv.addr [string]
+By default, we will use 2pc to guarantee the message is sent to RocketMQ
exactly once.
-`RocketMQ` name server cluster address.
+## Description
-### semantic [string]
+Write Rows to a Apache RocketMQ topic.
-Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+## Sink Options
+
+| Name | Type | Required | Default |
Description
|
+|----------------------|---------|----------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | string | yes | - |
`RocketMQ topic` name.
|
+| name.srv.addr | string | yes | - |
`RocketMQ` name server cluster address.
|
+| acl.enabled | Boolean | no | false | false
|
+| access.key | String | no | | When
ACL_ENABLED is true, access key cannot be empty
|
+| secret.key | String | no | | When
ACL_ENABLED is true, secret key cannot be empty
|
+| producer.group | String | no | SeaTunnel-producer-Group |
SeaTunnel-producer-Group
|
+| partition.key.fields | array | no | - | -
|
+| format | String | no | json | Data
format. The default format is json. Optional text format. The default field
separator is ",".If you customize the delimiter, add the "field_delimiter"
option. |
+| field.delimiter | String | no | , |
Customize the field delimiter for data format.
|
+| producer.send.sync | Boolean | no | false | If
true, the message will be sync sent.
|
+| common-options | config | no | - | Sink
plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
|
### partition.key.fields [array]
@@ -55,27 +53,150 @@ Upstream data is the following:
If name is set as the key, then the hash value of the name column will
determine which partition the message is sent to.
-### format
+## Task Example
+
+### Fake to Rocketmq Simple
+
+> The data is randomly generated and asynchronously sent to the test topic
+
+```hocon
+env {
+ execution.parallelism = 1
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topic = "test_topic"
+ }
+}
+
+```
-Data format. The default format is json. Optional text format. The default
field separator is ",".
-If you customize the delimiter, add the "field_delimiter" option.
+### Rocketmq To Rocketmq Simple
-### field_delimiter
+> Consuming Rocketmq writes to c_int field Hash number of partitions written
to different partitions This is the default asynchronous way to write
-Customize the field delimiter for data format.
+```hocon
+env {
+ execution.parallelism = 1
+}
-### common options [config]
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topics = "test_topic"
+ result_table_name = "rocketmq_table"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
-Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
+sink {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topic = "test_topic_sink"
+ partition.key.fields = ["c_int"]
+ }
+}
+```
+
+### Timestamp consumption write Simple
-## Examples
+> This is a stream consumption specified time stamp consumption, when there
are new partitions added the program will refresh the perception and
consumption at intervals, and write to another topic type
```hocon
+
+env {
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topics = "test_topic"
+ result_table_name = "rocketmq_table"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ batch.size = "400"
+ consumer.group = "test_topic_group"
+ format = "json"
+ format = json
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
sink {
Rocketmq {
name.srv.addr = "localhost:9876"
- topic = "test-topic-003"
- partition.key.fields = ["name"]
+ topic = "test_topic"
+ partition.key.fields = ["c_int"]
+ producer.send.sync = true
}
}
```
diff --git a/docs/en/connector-v2/source/RocketMQ.md
b/docs/en/connector-v2/source/RocketMQ.md
index fd209ce70b..1a3f00f436 100644
--- a/docs/en/connector-v2/source/RocketMQ.md
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -2,9 +2,15 @@
> RocketMQ source connector
-## Description
+## Support Apache RocketMQ Version
-Source connector for Apache RocketMQ.
+- 4.9.0 (Or a newer version, for reference)
+
+## Support These Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
## Key features
@@ -15,127 +21,198 @@ Source connector for Apache RocketMQ.
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-## Options
-
-| name | type | required | default
value |
-|-------------------------------------|---------|----------|----------------------------|
-| topics | String | yes | -
|
-| name.srv.addr | String | yes | -
|
-| acl.enabled | Boolean | no | false
|
-| access.key | String | no |
|
-| secret.key | String | no |
|
-| batch.size | int | no | 100
|
-| consumer.group | String | no |
SeaTunnel-Consumer-Group |
-| commit.on.checkpoint | Boolean | no | true
|
-| schema | | no | -
|
-| format | String | no | json
|
-| field.delimiter | String | no | ,
|
-| start.mode | String | no |
CONSUME_FROM_GROUP_OFFSETS |
-| start.mode.offsets | | no |
|
-| start.mode.timestamp | Long | no |
|
-| partition.discovery.interval.millis | long | no | -1
|
-| common-options | config | no | -
|
-
-### topics [string]
-
-`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for
example: `"tpc1,tpc2"`.
-
-### name.srv.addr [string]
-
-`RocketMQ` name server cluster address.
-
-### consumer.group [string]
-
-`RocketMQ consumer group id`, used to distinguish different consumer groups.
-
-### acl.enabled [boolean]
-
-If true, access control is enabled, and access key and secret key need to be
configured.
-
-### access.key [string]
-
-When ACL_ENABLED is true, access key cannot be empty.
-
-### secret.key [string]
-
-When ACL_ENABLED is true, secret key cannot be empty.
-
-### batch.size [int]
-
-`RocketMQ` consumer pull batch size
-
-### commit.on.checkpoint [boolean]
-
-If true the consumer's offset will be periodically committed in the background.
-
-## partition.discovery.interval.millis [long]
+## Description
-The interval for dynamically discovering topics and partitions.
+Source connector for Apache RocketMQ.
-### schema
+## Source Options
+
+| Name | Type | Required | Default
|
Description
|
+|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topics | String | yes | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`.
|
+| name.srv.addr | String | yes | -
| `RocketMQ` name server cluster address.
|
+| acl.enabled | Boolean | no | false
| If true, access control is enabled, and access key and secret key
need to be configured.
|
+| access.key | String | no |
|
|
+| secret.key | String | no |
| When ACL_ENABLED is true, secret key cannot be empty.
|
+| batch.size | int | no | 100
| `RocketMQ` consumer pull batch size
|
+| consumer.group | String | no |
SeaTunnel-Consumer-Group | `RocketMQ consumer group id`, used to distinguish
different consumer groups.
|
+| commit.on.checkpoint | Boolean | no | true
| If true the consumer's offset will be periodically committed in the
background.
|
+| schema | | no | -
| The structure of the data, including field names and field types.
|
+| format | String | no | json
| Data format. The default format is json. Optional text format. The
default field separator is ",".If you customize the delimiter, add the
"field.delimiter" option. |
+| field.delimiter | String | no | ,
| Customize the field delimiter for data format
|
+| start.mode | String | no |
CONSUME_FROM_GROUP_OFFSETS | The initial consumption pattern of consumers,there
are several types:
[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
|
+| start.mode.offsets | | no |
|
|
+| start.mode.timestamp | Long | no |
| The time required for consumption mode to be
"CONSUME_FROM_TIMESTAMP".
|
+| partition.discovery.interval.millis | long | no | -1
| The interval for dynamically discovering topics and partitions.
|
+| common-options | config | no | -
| Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
|
+
+### start.mode.offsets
-The structure of the data, including field names and field types.
+The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
-## format
+for example:
-Data format. The default format is json. Optional text format. The default
field separator is ", ".
-If you customize the delimiter, add the "field.delimiter" option.
+```hocon
+start.mode.offsets = {
+ topic1-0 = 70
+ topic1-1 = 10
+ topic1-2 = 10
+}
+```
-## field.delimiter
+## Task Example
-Customize the field delimiter for data format.
+### Simple:
-## start.mode
+> Consumer reads Rocketmq data and prints it to the console type
-The initial consumption pattern of consumers,there are several types:
-[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
-,[CONSUME_FROM_SPECIFIC_OFFSETS]
+```hocon
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
-## start.mode.timestamp
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_json"
+ result_table_name = "rocketmq_table"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
-The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
-## start.mode.offsets
+sink {
+ Console {
+ }
+}
+```
-The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
+### Specified format consumption Simple:
-for example:
+> When I consume the topic data in json format parsing and pulling the number
of bars each time is 400, the consumption starts from the original location
```hocon
-start.mode.offsets = {
- topic1-0 = 70
- topic1-1 = 10
- topic1-2 = 10
- }
-```
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
-### common-options [config]
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topics = "test_topic"
+ result_table_name = "rocketmq_table"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ batch.size = "400"
+ consumer.group = "test_topic_group"
+ format = "json"
+ format = json
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
-Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
+sink {
+ Console {
+ }
+}
+```
-## Example
+### Specified timestamp Simple:
-### Simple
+> This is to specify a time to consume, and I dynamically sense the existence
of a new partition every 1000 milliseconds to pull the consumption
```hocon
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
source {
Rocketmq {
name.srv.addr = "localhost:9876"
- topics = "test-topic-002"
- consumer.group = "consumer-group"
- parallelism = 2
- batch.size = 20
+ topics = "test_topic"
+ partition.discovery.interval.millis = "1000"
+ start.mode.timestamp="1694508382000"
+ consumer.group="test_topic_group"
+ format="json"
+ format = json
schema = {
- fields {
- age = int
- name = string
- }
- }
- start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
- start.mode.offsets = {
- test-topic-002-0 = 20
- }
-
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform
+}
+
+sink {
+ Console {
}
}
```