This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 85615468b7 [Feature][Connector-V2] Support multi-table read for
RocketMQ source (#10619)
85615468b7 is described below
commit 85615468b7ba2f71cdb3841b09a435d293ff0dd2
Author: xiaosiyuan <[email protected]>
AuthorDate: Mon Mar 23 16:07:19 2026 +0800
[Feature][Connector-V2] Support multi-table read for RocketMQ source
(#10619)
---
docs/en/connectors/source/RocketMQ.md | 50 +++-
docs/zh/connectors/source/RocketMQ.md | 50 +++-
.../seatunnel/rocketmq/source/RocketMqSource.java | 146 +---------
.../rocketmq/source/RocketMqSourceConfig.java | 307 +++++++++++++++++++++
.../rocketmq/source/RocketMqSourceFactory.java | 12 +-
.../rocketmq/source/RocketMqSourceReader.java | 34 ++-
.../source/RocketMqSourceSplitEnumerator.java | 151 ++++++----
.../RocketMqTableIdDeserializationSchema.java | 97 +++++++
.../rocketmq/source/TopicTableConfig.java | 37 +++
.../rocketmq/source/RocketMqSourceConfigTest.java | 139 ++++++++++
.../source/RocketMqSourceSplitEnumeratorTest.java | 210 ++++++++++++++
.../e2e/connector/rocketmq/RocketMqIT.java | 44 +++
.../rocketmq_multi_source_to_assert.conf | 138 +++++++++
13 files changed, 1201 insertions(+), 214 deletions(-)
diff --git a/docs/en/connectors/source/RocketMQ.md
b/docs/en/connectors/source/RocketMQ.md
index 7116ca8882..ac29e67104 100644
--- a/docs/en/connectors/source/RocketMQ.md
+++ b/docs/en/connectors/source/RocketMQ.md
@@ -31,7 +31,9 @@ Source connector for Apache RocketMQ.
| Name | Type | Required | Default
| Description
|
|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics | String | yes | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`.
|
+| topics | String | no | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`. You can configure only one of `topics` and
`tables_configs` at the same time. |
+| tables_configs | List | no | -
| Multi-table mode config list. Each item configures one table and
supports: `topics`, `format`, `schema`, `tags`, `start.mode`,
`start.mode.timestamp`, `start.mode.offsets`, `ignore_parse_errors`. You can
configure only one of `topics` and `tables_configs` at the same time. |
+| table_list | List | no | -
| Deprecated, use `tables_configs` instead.
|
| name.srv.addr | String | yes | -
| `RocketMQ` name server cluster address.
|
| tags | String | no | -
| `RocketMQ tag` name. If there are multiple `tags`, use `,` to split,
for example: `"tag1,tag2"`.
|
| acl.enabled | Boolean | no | false
| If true, access control is enabled, and access key and secret key
need to be configured.
|
@@ -281,6 +283,52 @@ sink {
}
```
+### Multiple RocketMQ Source
+
+> Read from multiple topics with different schemas. Use `tables_configs` to
configure each topic independently.
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ consumer.group = "multi_table_group"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ tables_configs = [
+ {
+ topics = "topic_1"
+ format = "json"
+ schema = {
+ fields {
+ id = int
+ name = string
+ }
+ }
+ },
+ {
+ topics = "topic_2"
+ format = "json"
+ schema = {
+ fields {
+ id = int
+ description = string
+ weight = double
+ }
+ }
+ }
+ ]
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
## Changelog
<ChangeLog />
diff --git a/docs/zh/connectors/source/RocketMQ.md
b/docs/zh/connectors/source/RocketMQ.md
index e70bce81c0..0ea86b2230 100644
--- a/docs/zh/connectors/source/RocketMQ.md
+++ b/docs/zh/connectors/source/RocketMQ.md
@@ -31,7 +31,9 @@ Apache RocketMQ 的源连接器。
| 参数名 | 类型 | 必须 | 默认值
| 描述
|
|-------------------------------------|---------|----|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics | String | 是 | -
| RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。
|
+| topics | String | 否 | -
| RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。`topics` 与
`tables_configs` 同时只能配置一个。
|
+| tables_configs | List | 否 | -
|
多表模式配置列表。每项配置一张表,支持:`topics`、`format`、`schema`、`tags`、`start.mode`、`start.mode.timestamp`、`start.mode.offsets`、`ignore_parse_errors`。`topics`
与 `tables_configs` 同时只能配置一个。 |
+| table_list | List | 否 | -
| 已废弃,请使用 `tables_configs` 代替。
|
| name.srv.addr | String | 是 | -
| RocketMQ 名称服务器集群地址。
|
| tags | String | 否 | -
| RocketMQ 标签名称。如果有多个标签,使用 `,` 分隔,例如:`"tag1,tag2"`。
|
| acl.enabled | Boolean | 否 | false
| 如果为 true,启用访问控制,需要配置访问密钥和秘密密钥。
|
@@ -165,6 +167,52 @@ sink {
}
```
+### 多表读取
+
+> 从不同 topic 读取不同结构的数据,使用 `tables_configs` 为每个 topic 独立配置。
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ consumer.group = "multi_table_group"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ tables_configs = [
+ {
+ topics = "topic_1"
+ format = "json"
+ schema = {
+ fields {
+ id = int
+ name = string
+ }
+ }
+ },
+ {
+ topics = "topic_2"
+ format = "json"
+ schema = {
+ fields {
+ id = int
+ description = string
+ weight = double
+ }
+ }
+ }
+ ]
+ }
+}
+
+sink {
+ Console {}
+}
+```
+
## 变更日志
<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
index 8b3cca4963..4fcb93f64d 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -19,124 +19,27 @@ package
org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
-import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
-import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
-import
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/** RocketMq source */
public class RocketMqSource
implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit,
RocketMqSourceState>,
SupportParallelism {
- private final ReadonlyConfig pluginConfig;
- private final CatalogTable catalogTable;
- private final ConsumerMetadata metadata;
- private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final RocketMqSourceConfig sourceConfig;
private JobContext jobContext;
public RocketMqSource(ReadonlyConfig pluginConfig) {
- this.pluginConfig = pluginConfig;
- // check config
- this.metadata = new ConsumerMetadata();
- this.metadata.setTopics(
- Arrays.asList(
- pluginConfig
- .get(RocketMqSourceOptions.TOPICS)
-
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER)));
-
- String tags = pluginConfig.get(RocketMqSourceOptions.TAGS);
- if (tags != null && !tags.trim().isEmpty()) {
- this.metadata.setTags(
-
Arrays.stream(tags.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
- .map(String::trim)
- .filter(tag -> !tag.isEmpty())
- .distinct()
- .collect(Collectors.toList()));
- } else {
- this.metadata.setTags(Collections.emptyList());
- }
-
- RocketMqBaseConfiguration.Builder baseConfigBuilder =
- RocketMqBaseConfiguration.newBuilder()
- .consumer()
-
.namesrvAddr(pluginConfig.get(RocketMqSourceOptions.NAME_SRV_ADDR));
- if
(pluginConfig.getOptional(RocketMqSourceOptions.ACCESS_KEY).isPresent()) {
-
baseConfigBuilder.accessKey(pluginConfig.get(RocketMqSourceOptions.ACCESS_KEY));
- }
- if
(pluginConfig.getOptional(RocketMqSourceOptions.SECRET_KEY).isPresent()) {
-
baseConfigBuilder.secretKey(pluginConfig.get(RocketMqSourceOptions.SECRET_KEY));
- }
-
baseConfigBuilder.aclEnable(pluginConfig.get(RocketMqSourceOptions.ACL_ENABLED));
-
baseConfigBuilder.groupId(pluginConfig.get(RocketMqSourceOptions.CONSUMER_GROUP));
-
baseConfigBuilder.batchSize(pluginConfig.get(RocketMqSourceOptions.BATCH_SIZE));
-
- baseConfigBuilder.pollTimeoutMillis(
- pluginConfig.get(RocketMqSourceOptions.POLL_TIMEOUT_MILLIS));
-
- this.metadata.setBaseConfig(baseConfigBuilder.build());
-
- this.metadata.setEnabledCommitCheckpoint(
- pluginConfig.get(RocketMqSourceOptions.COMMIT_ON_CHECKPOINT));
-
- StartMode startMode =
pluginConfig.get(RocketMqSourceOptions.START_MODE);
- switch (startMode) {
- case CONSUME_FROM_TIMESTAMP:
- long startOffsetsTimestamp =
-
pluginConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
- long currentTimestamp = System.currentTimeMillis();
- if (startOffsetsTimestamp < 0 || startOffsetsTimestamp >
currentTimestamp) {
- throw new IllegalArgumentException(
- "The offsets timestamp value is smaller than 0 or
smaller"
- + " than the current time");
- }
- this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
- break;
- case CONSUME_FROM_SPECIFIC_OFFSETS:
- Map<String, Long> offsetConfigMap =
-
pluginConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
- Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
- offsetConfigMap.forEach(
- (k, v) -> {
- int splitIndex = k.lastIndexOf("-");
- String topic = k.substring(0, splitIndex);
- String partition = k.substring(splitIndex + 1);
- MessageQueue messageQueue =
- new MessageQueue(topic, null,
Integer.parseInt(partition));
- specificStartOffsets.put(messageQueue, v);
- });
- this.metadata.setSpecificStartOffsets(specificStartOffsets);
- break;
- default:
- break;
- }
- this.metadata.setStartMode(startMode);
- this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
- // set deserialization
- setDeserialization(pluginConfig);
+ this.sourceConfig = new RocketMqSourceConfig(pluginConfig);
}
@Override
@@ -146,7 +49,7 @@ public class RocketMqSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- return Collections.singletonList(catalogTable);
+ return sourceConfig.getCatalogTables();
}
@Override
@@ -164,16 +67,18 @@ public class RocketMqSource
@Override
public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new RocketMqSourceReader(this.metadata, deserializationSchema,
readerContext);
+ return new RocketMqSourceReader(
+ sourceConfig.getMetadata(), sourceConfig.getTopicConfigs(),
readerContext);
}
@Override
public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState>
createEnumerator(
SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws
Exception {
return new RocketMqSourceSplitEnumerator(
- this.metadata,
+ sourceConfig.getMetadata(),
+ sourceConfig.getTopicConfigs(),
context,
-
pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
+ sourceConfig.getDiscoveryIntervalMillis());
}
@Override
@@ -182,38 +87,9 @@ public class RocketMqSource
RocketMqSourceState sourceState)
throws Exception {
return new RocketMqSourceSplitEnumerator(
- this.metadata,
+ sourceConfig.getMetadata(),
+ sourceConfig.getTopicConfigs(),
context,
-
pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
- }
-
- private void setDeserialization(ReadonlyConfig config) {
- if (config.getOptional(RocketMqSourceOptions.SCHEMA).isPresent()) {
- SchemaFormat format = config.get(RocketMqSourceOptions.FORMAT);
- boolean ignoreParseErrors =
config.get(RocketMqSourceOptions.IGNORE_PARSE_ERRORS);
- switch (format) {
- case JSON:
- deserializationSchema =
- new JsonDeserializationSchema(catalogTable, false,
ignoreParseErrors);
- break;
- case TEXT:
- deserializationSchema =
- TextDeserializationSchema.builder()
-
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
-
.delimiter(config.get(RocketMqSourceOptions.FIELD_DELIMITER))
- .build();
- break;
- default:
- throw new SeaTunnelJsonFormatException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "Unsupported format: " + format);
- }
- } else {
- this.deserializationSchema =
- TextDeserializationSchema.builder()
-
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
- .delimiter(String.valueOf('\002'))
- .build();
- }
+ sourceConfig.getDiscoveryIntervalMillis());
}
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
new file mode 100644
index 0000000000..e94d5d13da
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** RocketMQ source configuration, supports both single-table and multi-table
modes. */
+public class RocketMqSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final ConsumerMetadata metadata;
+ @Getter private final Map<String, TopicTableConfig> topicConfigs;
+ @Getter private final long discoveryIntervalMillis;
+
+ public RocketMqSourceConfig(ReadonlyConfig readonlyConfig) {
+ this.topicConfigs = new LinkedHashMap<>();
+ this.discoveryIntervalMillis =
+
readonlyConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+
+ List<Map<String, Object>> tableConfigList = null;
+ if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_CONFIGS).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_CONFIGS);
+ } else if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_LIST).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_LIST);
+ }
+
+ if (tableConfigList != null) {
+ // Multi-table mode
+ this.metadata = buildConsumerMetadata(readonlyConfig,
Collections.emptyList());
+ List<String> allTopics = new ArrayList<>();
+ for (Map<String, Object> tableConfig : tableConfigList) {
+ parseTableConfig(ReadonlyConfig.fromMap(tableConfig),
allTopics);
+ }
+ this.metadata.setTopics(allTopics);
+ } else {
+ // Single-table mode (backward compatible)
+ List<String> topics =
+ Arrays.stream(
+ readonlyConfig
+ .get(RocketMqSourceOptions.TOPICS)
+
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
+ this.metadata = buildConsumerMetadata(readonlyConfig, topics);
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ buildDeserialization(readonlyConfig, catalogTable);
+ List<String> tags = parseTags(readonlyConfig);
+ for (String topic : topics) {
+ TopicTableConfig config = new TopicTableConfig();
+ config.setCatalogTable(catalogTable);
+ config.setDeserializationSchema(deserializationSchema);
+ config.setTags(tags);
+ topicConfigs.put(topic, config);
+ }
+ }
+ }
+
+ private void parseTableConfig(ReadonlyConfig tableConfig, List<String>
allTopics) {
+ String topicsStr = tableConfig.get(RocketMqSourceOptions.TOPICS);
+ if (topicsStr == null || topicsStr.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "'topics' must be configured in each tables_configs entry,
but got: "
+ + tableConfig);
+ }
+ List<String> topics =
+
Arrays.stream(topicsStr.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
+
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(tableConfig);
+ if (TablePath.DEFAULT.equals(catalogTable.getTablePath())) {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("", TablePath.of(null, null,
topics.get(0))),
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment());
+ }
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ buildDeserialization(tableConfig, catalogTable);
+ List<String> tags = parseTags(tableConfig);
+ StartMode startMode =
+
tableConfig.getOptional(RocketMqSourceOptions.START_MODE).orElse(null);
+ Long startTimestamp = null;
+
+ if (startMode != null) {
+ switch (startMode) {
+ case CONSUME_FROM_TIMESTAMP:
+ startTimestamp =
tableConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
+ if (startTimestamp == null) {
+ throw new IllegalArgumentException(
+ "When 'start.mode' is set to
'CONSUME_FROM_TIMESTAMP' in tables_configs, "
+ + "'start.mode.timestamp' must also be
specified in the same table config entry. "
+ + "Topics: "
+ + topicsStr);
+ }
+ long currentTimestamp = System.currentTimeMillis();
+ if (startTimestamp < 0 || startTimestamp >
currentTimestamp) {
+ throw new IllegalArgumentException(
+ "The offsets timestamp value is smaller than 0
or larger"
+ + " than the current time");
+ }
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ Map<String, Long> offsetConfigMap =
+
tableConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
+ if (offsetConfigMap == null || offsetConfigMap.isEmpty()) {
+ throw new IllegalArgumentException(
+ "When 'start.mode' is set to
'CONSUME_FROM_SPECIFIC_OFFSETS' in tables_configs, "
+ + "'start.mode.offsets' must also be
specified in the same table config entry. "
+ + "Topics: "
+ + topicsStr);
+ }
+ Map<MessageQueue, Long> specificOffsets =
metadata.getSpecificStartOffsets();
+ if (specificOffsets == null) {
+ specificOffsets = new HashMap<>();
+ metadata.setSpecificStartOffsets(specificOffsets);
+ }
+ for (Map.Entry<String, Long> entry :
offsetConfigMap.entrySet()) {
+ int splitIndex = entry.getKey().lastIndexOf("-");
+ String topicName = entry.getKey().substring(0,
splitIndex);
+ int queueId =
Integer.parseInt(entry.getKey().substring(splitIndex + 1));
+ specificOffsets.put(
+ new MessageQueue(topicName, null, queueId),
entry.getValue());
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ for (String topic : topics) {
+ TopicTableConfig config = new TopicTableConfig();
+ config.setCatalogTable(catalogTable);
+ config.setDeserializationSchema(deserializationSchema);
+ config.setTags(tags);
+ config.setStartMode(startMode);
+ config.setStartTimestamp(startTimestamp);
+ topicConfigs.put(topic, config);
+ allTopics.add(topic);
+ }
+ }
+
+ private List<String> parseTags(ReadonlyConfig config) {
+ String tags = config.get(RocketMqSourceOptions.TAGS);
+ if (tags != null && !tags.trim().isEmpty()) {
+ return
Arrays.stream(tags.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(tag -> !tag.isEmpty())
+ .distinct()
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private ConsumerMetadata buildConsumerMetadata(
+ ReadonlyConfig readonlyConfig, List<String> topics) {
+ ConsumerMetadata consumerMetadata = new ConsumerMetadata();
+ consumerMetadata.setTopics(topics);
+ consumerMetadata.setTags(Collections.emptyList());
+
+ RocketMqBaseConfiguration.Builder baseConfigBuilder =
+ RocketMqBaseConfiguration.newBuilder()
+ .consumer()
+
.namesrvAddr(readonlyConfig.get(RocketMqSourceOptions.NAME_SRV_ADDR));
+ if
(readonlyConfig.getOptional(RocketMqSourceOptions.ACCESS_KEY).isPresent()) {
+
baseConfigBuilder.accessKey(readonlyConfig.get(RocketMqSourceOptions.ACCESS_KEY));
+ }
+ if
(readonlyConfig.getOptional(RocketMqSourceOptions.SECRET_KEY).isPresent()) {
+
baseConfigBuilder.secretKey(readonlyConfig.get(RocketMqSourceOptions.SECRET_KEY));
+ }
+
baseConfigBuilder.aclEnable(readonlyConfig.get(RocketMqSourceOptions.ACL_ENABLED));
+
baseConfigBuilder.groupId(readonlyConfig.get(RocketMqSourceOptions.CONSUMER_GROUP));
+
baseConfigBuilder.batchSize(readonlyConfig.get(RocketMqSourceOptions.BATCH_SIZE));
+ baseConfigBuilder.pollTimeoutMillis(
+ readonlyConfig.get(RocketMqSourceOptions.POLL_TIMEOUT_MILLIS));
+
+ consumerMetadata.setBaseConfig(baseConfigBuilder.build());
+ consumerMetadata.setEnabledCommitCheckpoint(
+
readonlyConfig.get(RocketMqSourceOptions.COMMIT_ON_CHECKPOINT));
+
+ StartMode startMode =
readonlyConfig.get(RocketMqSourceOptions.START_MODE);
+ switch (startMode) {
+ case CONSUME_FROM_TIMESTAMP:
+ long startOffsetsTimestamp =
+
readonlyConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
+ long currentTimestamp = System.currentTimeMillis();
+ if (startOffsetsTimestamp < 0 || startOffsetsTimestamp >
currentTimestamp) {
+ throw new IllegalArgumentException(
+ "The offsets timestamp value is smaller than 0 or
larger"
+ + " than the current time");
+ }
+
consumerMetadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ Map<String, Long> offsetConfigMap =
+
readonlyConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
+ Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
+ offsetConfigMap.forEach(
+ (k, v) -> {
+ int splitIndex = k.lastIndexOf("-");
+ String topic = k.substring(0, splitIndex);
+ String partition = k.substring(splitIndex + 1);
+ MessageQueue messageQueue =
+ new MessageQueue(topic, null,
Integer.parseInt(partition));
+ specificStartOffsets.put(messageQueue, v);
+ });
+ consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
+ break;
+ default:
+ break;
+ }
+ consumerMetadata.setStartMode(startMode);
+
+ return consumerMetadata;
+ }
+
+ private DeserializationSchema<SeaTunnelRow> buildDeserialization(
+ ReadonlyConfig config, CatalogTable catalogTable) {
+ DeserializationSchema<SeaTunnelRow> schema;
+ if (config.getOptional(RocketMqSourceOptions.SCHEMA).isPresent()) {
+ SchemaFormat format = config.get(RocketMqSourceOptions.FORMAT);
+ boolean ignoreParseErrors =
config.get(RocketMqSourceOptions.IGNORE_PARSE_ERRORS);
+ switch (format) {
+ case JSON:
+ schema = new JsonDeserializationSchema(catalogTable,
false, ignoreParseErrors);
+ break;
+ case TEXT:
+ schema =
+ TextDeserializationSchema.builder()
+
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+
.delimiter(config.get(RocketMqSourceOptions.FIELD_DELIMITER))
+ .setCatalogTable(catalogTable)
+ .build();
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+ "Unsupported format: " + format);
+ }
+ } else {
+ schema =
+ TextDeserializationSchema.builder()
+
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+ .delimiter(String.valueOf('\002'))
+ .setCatalogTable(catalogTable)
+ .build();
+ }
+ String tableId = catalogTable.getTablePath().toString();
+ return new RocketMqTableIdDeserializationSchema(schema, tableId);
+ }
+
+ public List<CatalogTable> getCatalogTables() {
+ return topicConfigs.values().stream()
+ .map(TopicTableConfig::getCatalogTable)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
index a6de8153f5..67ace88c08 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -42,7 +42,11 @@ public class RocketMqSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(RocketMqSourceOptions.TOPICS,
RocketMqSourceOptions.NAME_SRV_ADDR)
+ .required(RocketMqSourceOptions.NAME_SRV_ADDR)
+ .exclusive(
+ RocketMqSourceOptions.TOPICS,
+ RocketMqSourceOptions.TABLE_CONFIGS,
+ RocketMqSourceOptions.TABLE_LIST)
.optional(
RocketMqSourceOptions.FORMAT,
RocketMqSourceOptions.TAGS,
@@ -52,7 +56,8 @@ public class RocketMqSourceFactory implements
TableSourceFactory {
RocketMqSourceOptions.SCHEMA,
RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
RocketMqSourceOptions.POLL_TIMEOUT_MILLIS,
- RocketMqSourceOptions.BATCH_SIZE)
+ RocketMqSourceOptions.BATCH_SIZE,
+ RocketMqSourceOptions.IGNORE_PARSE_ERRORS)
.conditional(
RocketMqSourceOptions.START_MODE,
StartMode.CONSUME_FROM_TIMESTAMP,
@@ -60,8 +65,7 @@ public class RocketMqSourceFactory implements
TableSourceFactory {
.conditional(
RocketMqSourceOptions.START_MODE,
StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
- RocketMqSourceOptions.START_MODE_OFFSETS,
- RocketMqSourceOptions.IGNORE_PARSE_ERRORS)
+ RocketMqSourceOptions.START_MODE_OFFSETS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 65a1400361..062b82394a 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
@@ -56,7 +55,7 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
private final ExecutorService executorService;
- private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final Map<String, TopicTableConfig> topicConfigs;
private final LinkedBlockingQueue<RocketMqSourceSplit>
pendingPartitionsQueue;
@@ -64,12 +63,12 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
public RocketMqSourceReader(
ConsumerMetadata metadata,
- DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ Map<String, TopicTableConfig> topicConfigs,
Context context) {
this.metadata = metadata;
this.context = context;
this.sourceSplits = new HashSet<>();
- this.deserializationSchema = deserializationSchema;
+ this.topicConfigs = topicConfigs;
this.consumerThreads = new ConcurrentHashMap<>();
this.checkpointOffsets = new ConcurrentHashMap<>();
this.executorService =
@@ -146,18 +145,25 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
.collect(Collectors.toList());
long lastOffset = -1;
for (MessageExt record :
messages) {
- // Check if the tags are
specified and match the
- // record's tag
+ TopicTableConfig
topicConfig =
+
topicConfigs.get(record.getTopic());
+ if (topicConfig == null) {
+ throw new
RocketMqConnectorException(
+
RocketMqConnectorErrorCode
+
.CONSUME_DATA_FAILED,
+ "No config
found for topic: "
+ +
record.getTopic());
+ }
+ List<String> tags =
topicConfig.getTags();
boolean shouldProcess =
- metadata.getTags()
== null
- ||
metadata.getTags().isEmpty()
- ||
metadata.getTags()
-
.contains(
-
record
-
.getTags());
+ tags.isEmpty()
+ ||
tags.contains(
+
record.getTags());
if (shouldProcess) {
-
deserializationSchema.deserialize(
-
record.getBody(), output);
+ topicConfig
+
.getDeserializationSchema()
+ .deserialize(
+
record.getBody(), output);
lastOffset =
record.getQueueOffset();
}
if
(Boundedness.BOUNDED.equals(
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 18d60dbba9..91fa6e8a07 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.config.Common;
import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,6 +56,7 @@ public class RocketMqSourceSplitEnumerator
private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
private final ConsumerMetadata metadata;
+ private final Map<String, TopicTableConfig> topicConfigs;
private final Context<RocketMqSourceSplit> context;
private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
private ScheduledExecutorService executor;
@@ -63,8 +66,11 @@ public class RocketMqSourceSplitEnumerator
private long discoveryIntervalMillis;
public RocketMqSourceSplitEnumerator(
- ConsumerMetadata metadata,
SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+ ConsumerMetadata metadata,
+ Map<String, TopicTableConfig> topicConfigs,
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
this.metadata = metadata;
+ this.topicConfigs = topicConfigs;
this.context = context;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
@@ -75,9 +81,10 @@ public class RocketMqSourceSplitEnumerator
public RocketMqSourceSplitEnumerator(
ConsumerMetadata metadata,
+ Map<String, TopicTableConfig> topicConfigs,
SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
long discoveryIntervalMillis) {
- this(metadata, context);
+ this(metadata, topicConfigs, context);
this.discoveryIntervalMillis = discoveryIntervalMillis;
}
@@ -242,68 +249,94 @@ public class RocketMqSourceSplitEnumerator
return sourceSplits;
}
- private void setPartitionStartOffset() throws MQClientException {
- Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
- Map<MessageQueue, Long> topicPartitionOffsets = null;
- switch (metadata.getStartMode()) {
- case CONSUME_FROM_FIRST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- break;
- case CONSUME_FROM_LAST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- break;
- case CONSUME_FROM_TIMESTAMP:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
- break;
- case CONSUME_FROM_GROUP_OFFSETS:
- topicPartitionOffsets =
listConsumerGroupOffsets(topicPartitions);
- if (topicPartitionOffsets.isEmpty()) {
- topicPartitionOffsets =
- listOffsets(
- topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- }
- break;
- case CONSUME_FROM_SPECIFIC_OFFSETS:
- topicPartitionOffsets = metadata.getSpecificStartOffsets();
- // Fill in broker name
- setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
- break;
- default:
- throw new RocketMqConnectorException(
-
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
- metadata.getStartMode().name());
+ private StartMode getEffectiveStartMode(String topic) {
+ TopicTableConfig config = topicConfigs.get(topic);
+ if (config != null && config.getStartMode() != null) {
+ return config.getStartMode();
}
- topicPartitionOffsets
- .entrySet()
- .forEach(
- entry -> {
- if (pendingSplit.containsKey(entry.getKey())) {
-
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
- }
- });
+ return metadata.getStartMode();
}
- private void setMessageQueueBroker(
- Collection<MessageQueue> topicPartitions,
- Map<MessageQueue, Long> topicPartitionOffsets) {
- Map<String, String> flatTopicPartitions =
- topicPartitions.stream()
- .collect(
- Collectors.toMap(
- messageQueue ->
- messageQueue.getTopic()
- + "-"
- +
messageQueue.getQueueId(),
- MessageQueue::getBrokerName));
- for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
- String key = messageQueue.getTopic() + "-" +
messageQueue.getQueueId();
- if (flatTopicPartitions.containsKey(key)) {
- messageQueue.setBrokerName(flatTopicPartitions.get(key));
+ private void setPartitionStartOffset() throws MQClientException {
+ // Group pending partitions by their effective start mode (per-topic
override or global).
+ Map<StartMode, List<MessageQueue>> partitionsByMode = new
LinkedHashMap<>();
+ for (MessageQueue mq : pendingSplit.keySet()) {
+ StartMode effectiveMode = getEffectiveStartMode(mq.getTopic());
+ partitionsByMode.computeIfAbsent(effectiveMode, k -> new
ArrayList<>()).add(mq);
+ }
+
+ Map<MessageQueue, Long> topicPartitionOffsets = new HashMap<>();
+ for (Map.Entry<StartMode, List<MessageQueue>> entry :
partitionsByMode.entrySet()) {
+ StartMode startMode = entry.getKey();
+ List<MessageQueue> queues = entry.getValue();
+ switch (startMode) {
+ case CONSUME_FROM_FIRST_OFFSET:
+ topicPartitionOffsets.putAll(
+ listOffsets(queues,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET));
+ break;
+ case CONSUME_FROM_LAST_OFFSET:
+ topicPartitionOffsets.putAll(
+ listOffsets(queues,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET));
+ break;
+ case CONSUME_FROM_TIMESTAMP:
+ // Group by effective timestamp (per-topic or global).
+ Map<Long, List<MessageQueue>> queuesByTimestamp = new
LinkedHashMap<>();
+ for (MessageQueue mq : queues) {
+ TopicTableConfig config =
topicConfigs.get(mq.getTopic());
+ Long ts =
+ (config != null && config.getStartTimestamp()
!= null)
+ ? config.getStartTimestamp()
+ : metadata.getStartOffsetsTimestamp();
+ queuesByTimestamp.computeIfAbsent(ts, k -> new
ArrayList<>()).add(mq);
+ }
+ for (Map.Entry<Long, List<MessageQueue>> tsEntry :
+ queuesByTimestamp.entrySet()) {
+ topicPartitionOffsets.putAll(
+ RocketMqAdminUtil.searchOffsetsByTimestamp(
+ metadata.getBaseConfig(),
+ tsEntry.getValue(),
+ tsEntry.getKey()));
+ }
+ break;
+ case CONSUME_FROM_GROUP_OFFSETS:
+ Map<MessageQueue, Long> groupOffsets =
listConsumerGroupOffsets(queues);
+ if (groupOffsets.isEmpty()) {
+ topicPartitionOffsets.putAll(
+ listOffsets(queues,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET));
+ } else {
+ topicPartitionOffsets.putAll(groupOffsets);
+ }
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ Map<MessageQueue, Long> specificOffsets =
metadata.getSpecificStartOffsets();
+ if (specificOffsets != null) {
+ Map<String, Long> offsetByKey = new HashMap<>();
+ for (Map.Entry<MessageQueue, Long> e :
specificOffsets.entrySet()) {
+ offsetByKey.put(
+ e.getKey().getTopic() + "-" +
e.getKey().getQueueId(),
+ e.getValue());
+ }
+ for (MessageQueue mq : queues) {
+ Long offset = offsetByKey.get(mq.getTopic() + "-"
+ mq.getQueueId());
+ if (offset != null) {
+ topicPartitionOffsets.put(mq, offset);
+ }
+ }
+ }
+ break;
+ default:
+ throw new RocketMqConnectorException(
+
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
+ startMode.name());
}
}
+
+ topicPartitionOffsets.forEach(
+ (mq, offset) -> {
+ if (pendingSplit.containsKey(mq)) {
+ pendingSplit.get(mq).setStartOffset(offset);
+ }
+ });
}
private Map<MessageQueue, Long> listOffsets(
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
new file mode 100644
index 0000000000..22d3e9dcab
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+
+public class RocketMqTableIdDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DeserializationSchema<SeaTunnelRow> delegate;
+ private final String tableId;
+
+ public RocketMqTableIdDeserializationSchema(
+ DeserializationSchema<SeaTunnelRow> delegate, String tableId) {
+ this.delegate = delegate;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ SeaTunnelRow row = delegate.deserialize(message);
+ if (row != null) {
+ row.setTableId(tableId);
+ }
+ return row;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ delegate.deserialize(
+ message,
+ new Collector<SeaTunnelRow>() {
+ @Override
+ public void collect(SeaTunnelRow record) {
+ record.setTableId(tableId);
+ out.collect(record);
+ }
+
+ @Override
+ public void markSchemaChangeBeforeCheckpoint() {
+ out.markSchemaChangeBeforeCheckpoint();
+ }
+
+ @Override
+ public void collect(SchemaChangeEvent event) {
+ out.collect(event);
+ }
+
+ @Override
+ public void markSchemaChangeAfterCheckpoint() {
+ out.markSchemaChangeAfterCheckpoint();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return out.getCheckpointLock();
+ }
+
+ @Override
+ public boolean isEmptyThisPollNext() {
+ return out.isEmptyThisPollNext();
+ }
+
+ @Override
+ public void resetEmptyThisPollNext() {
+ out.resetEmptyThisPollNext();
+ }
+ });
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return delegate.getProducedType();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
new file mode 100644
index 0000000000..e0c77d61ad
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TopicTableConfig implements Serializable {
+ private CatalogTable catalogTable;
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private List<String> tags;
+ private StartMode startMode;
+ private Long startTimestamp;
+}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
new file mode 100644
index 0000000000..612fe74ec4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class RocketMqSourceConfigTest {
+
+ private static Map<String, Object> baseConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("name.srv.addr", "localhost:9876");
+ return config;
+ }
+
+ private static Map<String, Object> schemaOf(String... nameTypePairs) {
+ Map<String, Object> fields = new LinkedHashMap<>();
+ for (int i = 0; i < nameTypePairs.length; i += 2) {
+ fields.put(nameTypePairs[i], nameTypePairs[i + 1]);
+ }
+ Map<String, Object> schema = new HashMap<>();
+ schema.put("fields", fields);
+ return schema;
+ }
+
+ private static Map<String, Object> tableEntry(String topics, String...
nameTypePairs) {
+ Map<String, Object> entry = new HashMap<>();
+ entry.put("topics", topics);
+ entry.put("format", "json");
+ entry.put("schema", schemaOf(nameTypePairs));
+ return entry;
+ }
+
+ @Test
+ void testMultiTableMode_tablesConfigs_tableName() {
+ Map<String, Object> config = baseConfig();
+
+ // topic_a: no explicit table name, should fall back to topic name
+ Map<String, Object> entry1 = tableEntry("topic_a", "id", "bigint");
+
+ // topic_b: explicit table name
+ Map<String, Object> schemaWithTable = schemaOf("id", "bigint");
+ schemaWithTable.put("table", "my_custom_table");
+ Map<String, Object> entry2 = new HashMap<>();
+ entry2.put("topics", "topic_b");
+ entry2.put("format", "json");
+ entry2.put("schema", schemaWithTable);
+
+ config.put("tables_configs", Arrays.asList(entry1, entry2));
+
+ RocketMqSourceConfig sourceConfig =
+ new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+ CatalogTable tableA =
sourceConfig.getTopicConfigs().get("topic_a").getCatalogTable();
+ CatalogTable tableB =
sourceConfig.getTopicConfigs().get("topic_b").getCatalogTable();
+ assertEquals("topic_a", tableA.getTablePath().toString());
+ assertEquals("my_custom_table", tableB.getTablePath().toString());
+ }
+
+ @Test
+ void testMultiTableMode_perTableStartMode_consumeFromTimestamp() {
+ Map<String, Object> config = baseConfig();
+
+ long ts = System.currentTimeMillis() - 60_000;
+
+ Map<String, Object> entry = tableEntry("topic_ts", "id", "bigint");
+ entry.put("start.mode", "CONSUME_FROM_TIMESTAMP");
+ entry.put("start.mode.timestamp", ts);
+
+ List<Map<String, Object>> tablesConfigs = new ArrayList<>();
+ tablesConfigs.add(entry);
+ config.put("tables_configs", tablesConfigs);
+
+ RocketMqSourceConfig sourceConfig =
+ new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+ TopicTableConfig topicCfg =
sourceConfig.getTopicConfigs().get("topic_ts");
+ assertEquals(StartMode.CONSUME_FROM_TIMESTAMP,
topicCfg.getStartMode());
+ assertEquals(ts, topicCfg.getStartTimestamp());
+ }
+
+ @Test
+ void testMultiTableMode_specificOffsets_mergedAcrossTables() {
+ Map<String, Object> config = baseConfig();
+
+ Map<String, Long> offsets1 = new HashMap<>();
+ offsets1.put("topic_a-0", 100L);
+ offsets1.put("topic_a-1", 200L);
+ Map<String, Object> entry1 = tableEntry("topic_a", "id", "bigint");
+ entry1.put("start.mode", "CONSUME_FROM_SPECIFIC_OFFSETS");
+ entry1.put("start.mode.offsets", offsets1);
+
+ Map<String, Long> offsets2 = new HashMap<>();
+ offsets2.put("topic_b-0", 50L);
+ Map<String, Object> entry2 = tableEntry("topic_b", "id", "bigint");
+ entry2.put("start.mode", "CONSUME_FROM_SPECIFIC_OFFSETS");
+ entry2.put("start.mode.offsets", offsets2);
+
+ List<Map<String, Object>> tablesConfigs = new ArrayList<>();
+ tablesConfigs.add(entry1);
+ tablesConfigs.add(entry2);
+ config.put("tables_configs", tablesConfigs);
+
+ RocketMqSourceConfig sourceConfig =
+ new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+ // Specific offsets from both tables should be merged into the same
map in metadata
+ assertNotNull(sourceConfig.getMetadata().getSpecificStartOffsets());
+ assertEquals(3,
sourceConfig.getMetadata().getSpecificStartOffsets().size());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..e10a70399d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+
+class RocketMqSourceSplitEnumeratorTest {
+
+ @Test
+ void testRun_usesPerTopicTimestampOverrides() throws Exception {
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setTopics(Arrays.asList("topic_a", "topic_b"));
+ metadata.setStartMode(StartMode.CONSUME_FROM_TIMESTAMP);
+ metadata.setStartOffsetsTimestamp(1_000L);
+
+ TopicTableConfig topicAConfig = new TopicTableConfig();
+ topicAConfig.setStartMode(StartMode.CONSUME_FROM_TIMESTAMP);
+ topicAConfig.setStartTimestamp(2_000L);
+
+ Map<String, TopicTableConfig> topicConfigs = new HashMap<>();
+ topicConfigs.put("topic_a", topicAConfig);
+
+ MessageQueue topicAQueue = new MessageQueue("topic_a", "broker-a", 0);
+ MessageQueue topicBQueue = new MessageQueue("topic_b", "broker-b", 0);
+
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context =
mockContext();
+
+ try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+ Mockito.mockStatic(RocketMqAdminUtil.class)) {
+ mockedAdmin
+ .when(() -> RocketMqAdminUtil.offsetTopics(any(),
eq(metadata.getTopics())))
+ .thenReturn(
+ Collections.singletonList(
+ topicOffsets(
+ queueOffset(topicAQueue, 0L, 20L),
+ queueOffset(topicBQueue, 0L,
10L))));
+ mockedAdmin
+ .when(
+ () ->
+ RocketMqAdminUtil.searchOffsetsByTimestamp(
+ any(),
+
eq(Collections.singletonList(topicAQueue)),
+ eq(2_000L)))
+ .thenReturn(Collections.singletonMap(topicAQueue, 12L));
+ mockedAdmin
+ .when(
+ () ->
+ RocketMqAdminUtil.searchOffsetsByTimestamp(
+ any(),
+
eq(Collections.singletonList(topicBQueue)),
+ eq(1_000L)))
+ .thenReturn(Collections.singletonMap(topicBQueue, 7L));
+
+ RocketMqSourceSplitEnumerator enumerator =
+ new RocketMqSourceSplitEnumerator(metadata, topicConfigs,
context, -1L);
+
+ enumerator.run();
+ }
+
+ Map<String, RocketMqSourceSplit> splitsByTopic =
captureAssignedSplits(context);
+ Assertions.assertEquals(12L,
splitsByTopic.get("topic_a").getStartOffset());
+ Assertions.assertEquals(7L,
splitsByTopic.get("topic_b").getStartOffset());
+ }
+
+ @Test
+ void testRun_fallsBackToFirstOffsetWhenGroupOffsetsAreMissing() throws
Exception {
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setTopics(Collections.singletonList("topic_group"));
+ metadata.setStartMode(StartMode.CONSUME_FROM_GROUP_OFFSETS);
+
+ MessageQueue messageQueue = new MessageQueue("topic_group",
"broker-group", 0);
+
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context =
mockContext();
+
+ try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+ Mockito.mockStatic(RocketMqAdminUtil.class)) {
+ mockedAdmin
+ .when(() -> RocketMqAdminUtil.offsetTopics(any(),
eq(metadata.getTopics())))
+ .thenReturn(
+ Collections.singletonList(
+ topicOffsets(queueOffset(messageQueue, 5L,
18L))));
+ mockedAdmin
+ .when(
+ () ->
+ RocketMqAdminUtil.currentOffsets(
+ any(),
+ eq(metadata.getTopics()),
+
eq(Collections.singleton(messageQueue))))
+ .thenReturn(Collections.emptyMap());
+ mockedAdmin
+ .when(() -> RocketMqAdminUtil.flatOffsetTopics(any(),
eq(metadata.getTopics())))
+ .thenReturn(topicOffsets(queueOffset(messageQueue, 5L,
18L)));
+
+ RocketMqSourceSplitEnumerator enumerator =
+ new RocketMqSourceSplitEnumerator(
+ metadata, Collections.emptyMap(), context, -1L);
+
+ enumerator.run();
+ }
+
+ Map<String, RocketMqSourceSplit> splitsByTopic =
captureAssignedSplits(context);
+ Assertions.assertEquals(5L,
splitsByTopic.get("topic_group").getStartOffset());
+ }
+
+ @Test
+ void testRun_matchesSpecificOffsetsByTopicAndQueueId() throws Exception {
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setTopics(Collections.singletonList("topic_specific"));
+ metadata.setStartMode(StartMode.CONSUME_FROM_SPECIFIC_OFFSETS);
+ metadata.setSpecificStartOffsets(
+ Collections.singletonMap(new MessageQueue("topic_specific",
null, 0), 33L));
+
+ MessageQueue messageQueue = new MessageQueue("topic_specific",
"broker-specific", 0);
+
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context =
mockContext();
+
+ try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+ Mockito.mockStatic(RocketMqAdminUtil.class)) {
+ mockedAdmin
+ .when(() -> RocketMqAdminUtil.offsetTopics(any(),
eq(metadata.getTopics())))
+ .thenReturn(
+ Collections.singletonList(
+ topicOffsets(queueOffset(messageQueue, 0L,
50L))));
+
+ RocketMqSourceSplitEnumerator enumerator =
+ new RocketMqSourceSplitEnumerator(
+ metadata, Collections.emptyMap(), context, -1L);
+
+ enumerator.run();
+ }
+
+ Map<String, RocketMqSourceSplit> splitsByTopic =
captureAssignedSplits(context);
+ Assertions.assertEquals(33L,
splitsByTopic.get("topic_specific").getStartOffset());
+ }
+
+ private SourceSplitEnumerator.Context<RocketMqSourceSplit> mockContext() {
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context =
+ Mockito.mock(SourceSplitEnumerator.Context.class);
+ Mockito.when(context.currentParallelism()).thenReturn(1);
+ return context;
+ }
+
+ private Map<String, RocketMqSourceSplit> captureAssignedSplits(
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+ ArgumentCaptor<List> splitsCaptor =
ArgumentCaptor.forClass(List.class);
+ Mockito.verify(context).assignSplit(eq(0), splitsCaptor.capture());
+
+ List<RocketMqSourceSplit> splits = splitsCaptor.getValue();
+ Assertions.assertEquals(1, splitsCaptor.getAllValues().size());
+ return splits.stream()
+ .collect(
+ Collectors.toMap(
+ split -> split.getMessageQueue().getTopic(),
split -> split));
+ }
+
+ private Map<MessageQueue, TopicOffset> topicOffsets(
+ Map.Entry<MessageQueue, TopicOffset>... entries) {
+ Map<MessageQueue, TopicOffset> offsets = new LinkedHashMap<>();
+ for (Map.Entry<MessageQueue, TopicOffset> entry : entries) {
+ offsets.put(entry.getKey(), entry.getValue());
+ }
+ return offsets;
+ }
+
+ private Map.Entry<MessageQueue, TopicOffset> queueOffset(
+ MessageQueue messageQueue, long minOffset, long maxOffset) {
+ TopicOffset topicOffset = new TopicOffset();
+ topicOffset.setMinOffset(minOffset);
+ topicOffset.setMaxOffset(maxOffset);
+ return new AbstractMap.SimpleImmutableEntry<>(messageQueue,
topicOffset);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index e59237e73d..1023abd529 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -298,6 +298,50 @@ public class RocketMqIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "The multi-catalog does not currently support the
Spark Flink engine")
+ @TestTemplate
+ public void testSourceRocketMqMultiTableToAssert(TestContainer container)
+ throws IOException, InterruptedException {
+ String topicA = "test_topic_multi_a";
+ String topicB = "test_topic_multi_b";
+
+ // topicA: 5 messages without tag (ids 0-4).
+ // The conf sets global start.mode=CONSUME_FROM_LAST_OFFSET but
overrides topicA to
+ // CONSUME_FROM_FIRST_OFFSET, so all 5 pre-written messages must be
consumed.
+ DefaultSeaTunnelRowSerializer serializerA =
+ new DefaultSeaTunnelRowSerializer(
+ topicA, null, SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
+ generateTestData(serializerA::serializeRow, topicA, 0, 5);
+
+ // topicB: 3 messages with "tag_b" (ids 100-102) + 4 messages with
"other_tag" (ids
+ // 103-106).
+ // The conf overrides topicB to CONSUME_FROM_FIRST_OFFSET and filters
by tags="tag_b",
+ // so exactly 3 messages must be consumed; other_tag messages are
dropped.
+ DefaultSeaTunnelRowSerializer serializerB =
+ new DefaultSeaTunnelRowSerializer(
+ topicB,
+ "tag_b",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ DefaultSeaTunnelRowSerializer serializerBOther =
+ new DefaultSeaTunnelRowSerializer(
+ topicB,
+ "other_tag",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(serializerB::serializeRow, topicB, 100, 103);
+ generateTestData(serializerBOther::serializeRow, topicB, 103, 107);
+
+ Container.ExecResult execResult =
+
container.executeJob("/multiTableIT/rocketmq_multi_source_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testRocketMqLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
new file mode 100644
index 0000000000..f018b404a2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ start.mode = "CONSUME_FROM_LAST_OFFSET"
+ tables_configs = [
+ {
+ topics = "test_topic_multi_a"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ format = json
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ },
+ {
+ topics = "test_topic_multi_b"
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ tags = "tag_b"
+ format = json
+ schema = {
+ table = "rocketmq_multi_custom"
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules = {
+ tables_configs = [
+ {
+ table_path = "test_topic_multi_a"
+ row_rules = [
+ {rule_type = MIN_ROW, rule_value = 5},
+ {rule_type = MAX_ROW, rule_value = 5}
+ ]
+ field_rules = [
+ {
+ field_name = id
+ field_type = bigint
+ field_value = [
+ {rule_type = NOT_NULL},
+ {rule_type = MIN, rule_value = 0},
+ {rule_type = MAX, rule_value = 4}
+ ]
+ },
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [{rule_type = NOT_NULL}]
+ }
+ ]
+ },
+ {
+ table_path = "rocketmq_multi_custom"
+ row_rules = [
+ {rule_type = MIN_ROW, rule_value = 3},
+ {rule_type = MAX_ROW, rule_value = 3}
+ ]
+ field_rules = [
+ {
+ field_name = id
+ field_type = bigint
+ field_value = [
+ {rule_type = NOT_NULL},
+ {rule_type = MIN, rule_value = 100},
+ {rule_type = MAX, rule_value = 102}
+ ]
+ },
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [{rule_type = NOT_NULL}]
+ }
+ ]
+ }
+ ]
+ }
+ }
+}