This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 85615468b7 [Feature][Connector-V2] Support multi-table read for 
RocketMQ source (#10619)
85615468b7 is described below

commit 85615468b7ba2f71cdb3841b09a435d293ff0dd2
Author: xiaosiyuan <[email protected]>
AuthorDate: Mon Mar 23 16:07:19 2026 +0800

    [Feature][Connector-V2] Support multi-table read for RocketMQ source 
(#10619)
---
 docs/en/connectors/source/RocketMQ.md              |  50 +++-
 docs/zh/connectors/source/RocketMQ.md              |  50 +++-
 .../seatunnel/rocketmq/source/RocketMqSource.java  | 146 +---------
 .../rocketmq/source/RocketMqSourceConfig.java      | 307 +++++++++++++++++++++
 .../rocketmq/source/RocketMqSourceFactory.java     |  12 +-
 .../rocketmq/source/RocketMqSourceReader.java      |  34 ++-
 .../source/RocketMqSourceSplitEnumerator.java      | 151 ++++++----
 .../RocketMqTableIdDeserializationSchema.java      |  97 +++++++
 .../rocketmq/source/TopicTableConfig.java          |  37 +++
 .../rocketmq/source/RocketMqSourceConfigTest.java  | 139 ++++++++++
 .../source/RocketMqSourceSplitEnumeratorTest.java  | 210 ++++++++++++++
 .../e2e/connector/rocketmq/RocketMqIT.java         |  44 +++
 .../rocketmq_multi_source_to_assert.conf           | 138 +++++++++
 13 files changed, 1201 insertions(+), 214 deletions(-)

diff --git a/docs/en/connectors/source/RocketMQ.md 
b/docs/en/connectors/source/RocketMQ.md
index 7116ca8882..ac29e67104 100644
--- a/docs/en/connectors/source/RocketMQ.md
+++ b/docs/en/connectors/source/RocketMQ.md
@@ -31,7 +31,9 @@ Source connector for Apache RocketMQ.
 
 | Name                                |  Type   | Required |          Default  
         | Description                                                          
                                                                                
                                                              |
 
|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics                              | String  | yes      | -                 
         | `RocketMQ topic` name. If there are multiple `topics`, use `,` to 
split, for example: `"tpc1,tpc2"`.                                              
                                                                 |
+| topics                              | String  | no       | -                 
         | `RocketMQ topic` name. If there are multiple `topics`, use `,` to 
split, for example: `"tpc1,tpc2"`. You can configure only one of `topics` and 
`tables_configs` at the same time.                                 |
+| tables_configs                      | List    | no       | -                 
         | Multi-table mode config list. Each item configures one table and 
supports: `topics`, `format`, `schema`, `tags`, `start.mode`, 
`start.mode.timestamp`, `start.mode.offsets`, `ignore_parse_errors`. You can 
configure only one of `topics` and `tables_configs` at the same time. |
+| table_list                          | List    | no       | -                 
         | Deprecated, use `tables_configs` instead.                            
                                                                                
                                                              |
 | name.srv.addr                       | String  | yes      | -                 
         | `RocketMQ` name server cluster address.                              
                                                                                
                                                              |
 | tags                                | String  | no       | -                 
         | `RocketMQ tag` name. If there are multiple `tags`, use `,` to split, 
for example: `"tag1,tag2"`.                                                     
                                                              |
 | acl.enabled                         | Boolean | no       | false             
         | If true, access control is enabled, and access key and secret key 
need to be configured.                                                          
                                                                 |
@@ -281,6 +283,52 @@ sink {
 }
 ```
 
+### Multiple RocketMQ Source
+
+> Read from multiple topics with different schemas. Use `tables_configs` to 
configure each topic independently.
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    consumer.group = "multi_table_group"
+    start.mode = "CONSUME_FROM_FIRST_OFFSET"
+    tables_configs = [
+      {
+        topics = "topic_1"
+        format = "json"
+        schema = {
+          fields {
+            id = int
+            name = string
+          }
+        }
+      },
+      {
+        topics = "topic_2"
+        format = "json"
+        schema = {
+          fields {
+            id = int
+            description = string
+            weight = double
+          }
+        }
+      }
+    ]
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
 ## Changelog
 
 <ChangeLog />
diff --git a/docs/zh/connectors/source/RocketMQ.md 
b/docs/zh/connectors/source/RocketMQ.md
index e70bce81c0..0ea86b2230 100644
--- a/docs/zh/connectors/source/RocketMQ.md
+++ b/docs/zh/connectors/source/RocketMQ.md
@@ -31,7 +31,9 @@ Apache RocketMQ 的源连接器。
 
 | 参数名                                 | 类型      | 必须 | 默认值                     
   | 描述                                                                         
                                                                                
   |
 
|-------------------------------------|---------|----|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics                              | String  | 是  | -                       
   | RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。                          
                                                                                
   |
+| topics                              | String  | 否  | -                       
   | RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。`topics` 与 
`tables_configs` 同时只能配置一个。                                                      
                 |
+| tables_configs                      | List    | 否  | -                       
   | 
多表模式配置列表。每项配置一张表,支持:`topics`、`format`、`schema`、`tags`、`start.mode`、`start.mode.timestamp`、`start.mode.offsets`、`ignore_parse_errors`。`topics`
 与 `tables_configs` 同时只能配置一个。 |
+| table_list                          | List    | 否  | -                       
   | 已废弃,请使用 `tables_configs` 代替。                                               
                                                                                
   |
 | name.srv.addr                       | String  | 是  | -                       
   | RocketMQ 名称服务器集群地址。                                                        
                                                                                
   |
 | tags                                | String  | 否  | -                       
   | RocketMQ 标签名称。如果有多个标签,使用 `,` 分隔,例如:`"tag1,tag2"`。                          
                                                                                
   |
 | acl.enabled                         | Boolean | 否  | false                   
   | 如果为 true,启用访问控制,需要配置访问密钥和秘密密钥。                                             
                                                                                
   |
@@ -165,6 +167,52 @@ sink {
 }
 ```
 
+### 多表读取
+
+> 从不同 topic 读取不同结构的数据,使用 `tables_configs` 为每个 topic 独立配置。
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    consumer.group = "multi_table_group"
+    start.mode = "CONSUME_FROM_FIRST_OFFSET"
+    tables_configs = [
+      {
+        topics = "topic_1"
+        format = "json"
+        schema = {
+          fields {
+            id = int
+            name = string
+          }
+        }
+      },
+      {
+        topics = "topic_2"
+        format = "json"
+        schema = {
+          fields {
+            id = int
+            description = string
+            weight = double
+          }
+        }
+      }
+    ]
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
index 8b3cca4963..4fcb93f64d 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -19,124 +19,27 @@ package 
org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
-import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
-import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
-import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
-import org.apache.seatunnel.format.json.JsonDeserializationSchema;
-import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
-import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
-import org.apache.rocketmq.common.message.MessageQueue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 /** RocketMq source */
 public class RocketMqSource
         implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, 
RocketMqSourceState>,
                 SupportParallelism {
 
-    private final ReadonlyConfig pluginConfig;
-    private final CatalogTable catalogTable;
-    private final ConsumerMetadata metadata;
-    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final RocketMqSourceConfig sourceConfig;
     private JobContext jobContext;
 
     public RocketMqSource(ReadonlyConfig pluginConfig) {
-        this.pluginConfig = pluginConfig;
-        // check config
-        this.metadata = new ConsumerMetadata();
-        this.metadata.setTopics(
-                Arrays.asList(
-                        pluginConfig
-                                .get(RocketMqSourceOptions.TOPICS)
-                                
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER)));
-
-        String tags = pluginConfig.get(RocketMqSourceOptions.TAGS);
-        if (tags != null && !tags.trim().isEmpty()) {
-            this.metadata.setTags(
-                    
Arrays.stream(tags.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
-                            .map(String::trim)
-                            .filter(tag -> !tag.isEmpty())
-                            .distinct()
-                            .collect(Collectors.toList()));
-        } else {
-            this.metadata.setTags(Collections.emptyList());
-        }
-
-        RocketMqBaseConfiguration.Builder baseConfigBuilder =
-                RocketMqBaseConfiguration.newBuilder()
-                        .consumer()
-                        
.namesrvAddr(pluginConfig.get(RocketMqSourceOptions.NAME_SRV_ADDR));
-        if 
(pluginConfig.getOptional(RocketMqSourceOptions.ACCESS_KEY).isPresent()) {
-            
baseConfigBuilder.accessKey(pluginConfig.get(RocketMqSourceOptions.ACCESS_KEY));
-        }
-        if 
(pluginConfig.getOptional(RocketMqSourceOptions.SECRET_KEY).isPresent()) {
-            
baseConfigBuilder.secretKey(pluginConfig.get(RocketMqSourceOptions.SECRET_KEY));
-        }
-        
baseConfigBuilder.aclEnable(pluginConfig.get(RocketMqSourceOptions.ACL_ENABLED));
-        
baseConfigBuilder.groupId(pluginConfig.get(RocketMqSourceOptions.CONSUMER_GROUP));
-        
baseConfigBuilder.batchSize(pluginConfig.get(RocketMqSourceOptions.BATCH_SIZE));
-
-        baseConfigBuilder.pollTimeoutMillis(
-                pluginConfig.get(RocketMqSourceOptions.POLL_TIMEOUT_MILLIS));
-
-        this.metadata.setBaseConfig(baseConfigBuilder.build());
-
-        this.metadata.setEnabledCommitCheckpoint(
-                pluginConfig.get(RocketMqSourceOptions.COMMIT_ON_CHECKPOINT));
-
-        StartMode startMode = 
pluginConfig.get(RocketMqSourceOptions.START_MODE);
-        switch (startMode) {
-            case CONSUME_FROM_TIMESTAMP:
-                long startOffsetsTimestamp =
-                        
pluginConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
-                long currentTimestamp = System.currentTimeMillis();
-                if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
-                    throw new IllegalArgumentException(
-                            "The offsets timestamp value is smaller than 0 or 
smaller"
-                                    + " than the current time");
-                }
-                this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
-                break;
-            case CONSUME_FROM_SPECIFIC_OFFSETS:
-                Map<String, Long> offsetConfigMap =
-                        
pluginConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
-                Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
-                offsetConfigMap.forEach(
-                        (k, v) -> {
-                            int splitIndex = k.lastIndexOf("-");
-                            String topic = k.substring(0, splitIndex);
-                            String partition = k.substring(splitIndex + 1);
-                            MessageQueue messageQueue =
-                                    new MessageQueue(topic, null, 
Integer.parseInt(partition));
-                            specificStartOffsets.put(messageQueue, v);
-                        });
-                this.metadata.setSpecificStartOffsets(specificStartOffsets);
-                break;
-            default:
-                break;
-        }
-        this.metadata.setStartMode(startMode);
-        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
-        // set deserialization
-        setDeserialization(pluginConfig);
+        this.sourceConfig = new RocketMqSourceConfig(pluginConfig);
     }
 
     @Override
@@ -146,7 +49,7 @@ public class RocketMqSource
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        return Collections.singletonList(catalogTable);
+        return sourceConfig.getCatalogTables();
     }
 
     @Override
@@ -164,16 +67,18 @@ public class RocketMqSource
     @Override
     public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new RocketMqSourceReader(this.metadata, deserializationSchema, 
readerContext);
+        return new RocketMqSourceReader(
+                sourceConfig.getMetadata(), sourceConfig.getTopicConfigs(), 
readerContext);
     }
 
     @Override
     public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws 
Exception {
         return new RocketMqSourceSplitEnumerator(
-                this.metadata,
+                sourceConfig.getMetadata(),
+                sourceConfig.getTopicConfigs(),
                 context,
-                
pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
+                sourceConfig.getDiscoveryIntervalMillis());
     }
 
     @Override
@@ -182,38 +87,9 @@ public class RocketMqSource
             RocketMqSourceState sourceState)
             throws Exception {
         return new RocketMqSourceSplitEnumerator(
-                this.metadata,
+                sourceConfig.getMetadata(),
+                sourceConfig.getTopicConfigs(),
                 context,
-                
pluginConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS));
-    }
-
-    private void setDeserialization(ReadonlyConfig config) {
-        if (config.getOptional(RocketMqSourceOptions.SCHEMA).isPresent()) {
-            SchemaFormat format = config.get(RocketMqSourceOptions.FORMAT);
-            boolean ignoreParseErrors = 
config.get(RocketMqSourceOptions.IGNORE_PARSE_ERRORS);
-            switch (format) {
-                case JSON:
-                    deserializationSchema =
-                            new JsonDeserializationSchema(catalogTable, false, 
ignoreParseErrors);
-                    break;
-                case TEXT:
-                    deserializationSchema =
-                            TextDeserializationSchema.builder()
-                                    
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
-                                    
.delimiter(config.get(RocketMqSourceOptions.FIELD_DELIMITER))
-                                    .build();
-                    break;
-                default:
-                    throw new SeaTunnelJsonFormatException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                            "Unsupported format: " + format);
-            }
-        } else {
-            this.deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
-                            .delimiter(String.valueOf('\002'))
-                            .build();
-        }
+                sourceConfig.getDiscoveryIntervalMillis());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
new file mode 100644
index 0000000000..e94d5d13da
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** RocketMQ source configuration, supports both single-table and multi-table 
modes. */
+public class RocketMqSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private final ConsumerMetadata metadata;
+    @Getter private final Map<String, TopicTableConfig> topicConfigs;
+    @Getter private final long discoveryIntervalMillis;
+
+    public RocketMqSourceConfig(ReadonlyConfig readonlyConfig) {
+        this.topicConfigs = new LinkedHashMap<>();
+        this.discoveryIntervalMillis =
+                
readonlyConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+
+        List<Map<String, Object>> tableConfigList = null;
+        if 
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_CONFIGS).isPresent()) {
+            tableConfigList = 
readonlyConfig.get(RocketMqSourceOptions.TABLE_CONFIGS);
+        } else if 
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_LIST).isPresent()) {
+            tableConfigList = 
readonlyConfig.get(RocketMqSourceOptions.TABLE_LIST);
+        }
+
+        if (tableConfigList != null) {
+            // Multi-table mode
+            this.metadata = buildConsumerMetadata(readonlyConfig, 
Collections.emptyList());
+            List<String> allTopics = new ArrayList<>();
+            for (Map<String, Object> tableConfig : tableConfigList) {
+                parseTableConfig(ReadonlyConfig.fromMap(tableConfig), 
allTopics);
+            }
+            this.metadata.setTopics(allTopics);
+        } else {
+            // Single-table mode (backward compatible)
+            List<String> topics =
+                    Arrays.stream(
+                                    readonlyConfig
+                                            .get(RocketMqSourceOptions.TOPICS)
+                                            
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+                            .map(String::trim)
+                            .filter(t -> !t.isEmpty())
+                            .collect(Collectors.toList());
+            this.metadata = buildConsumerMetadata(readonlyConfig, topics);
+            CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
+            DeserializationSchema<SeaTunnelRow> deserializationSchema =
+                    buildDeserialization(readonlyConfig, catalogTable);
+            List<String> tags = parseTags(readonlyConfig);
+            for (String topic : topics) {
+                TopicTableConfig config = new TopicTableConfig();
+                config.setCatalogTable(catalogTable);
+                config.setDeserializationSchema(deserializationSchema);
+                config.setTags(tags);
+                topicConfigs.put(topic, config);
+            }
+        }
+    }
+
+    private void parseTableConfig(ReadonlyConfig tableConfig, List<String> 
allTopics) {
+        String topicsStr = tableConfig.get(RocketMqSourceOptions.TOPICS);
+        if (topicsStr == null || topicsStr.trim().isEmpty()) {
+            throw new IllegalArgumentException(
+                    "'topics' must be configured in each tables_configs entry, 
but got: "
+                            + tableConfig);
+        }
+        List<String> topics =
+                
Arrays.stream(topicsStr.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+                        .map(String::trim)
+                        .filter(t -> !t.isEmpty())
+                        .collect(Collectors.toList());
+
+        CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(tableConfig);
+        if (TablePath.DEFAULT.equals(catalogTable.getTablePath())) {
+            catalogTable =
+                    CatalogTable.of(
+                            TableIdentifier.of("", TablePath.of(null, null, 
topics.get(0))),
+                            catalogTable.getTableSchema(),
+                            catalogTable.getOptions(),
+                            catalogTable.getPartitionKeys(),
+                            catalogTable.getComment());
+        }
+        DeserializationSchema<SeaTunnelRow> deserializationSchema =
+                buildDeserialization(tableConfig, catalogTable);
+        List<String> tags = parseTags(tableConfig);
+        StartMode startMode =
+                
tableConfig.getOptional(RocketMqSourceOptions.START_MODE).orElse(null);
+        Long startTimestamp = null;
+
+        if (startMode != null) {
+            switch (startMode) {
+                case CONSUME_FROM_TIMESTAMP:
+                    startTimestamp = 
tableConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
+                    if (startTimestamp == null) {
+                        throw new IllegalArgumentException(
+                                "When 'start.mode' is set to 
'CONSUME_FROM_TIMESTAMP' in tables_configs, "
+                                        + "'start.mode.timestamp' must also be 
specified in the same table config entry. "
+                                        + "Topics: "
+                                        + topicsStr);
+                    }
+                    long currentTimestamp = System.currentTimeMillis();
+                    if (startTimestamp < 0 || startTimestamp > 
currentTimestamp) {
+                        throw new IllegalArgumentException(
+                                "The offsets timestamp value is smaller than 0 
or larger"
+                                        + " than the current time");
+                    }
+                    break;
+                case CONSUME_FROM_SPECIFIC_OFFSETS:
+                    Map<String, Long> offsetConfigMap =
+                            
tableConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
+                    if (offsetConfigMap == null || offsetConfigMap.isEmpty()) {
+                        throw new IllegalArgumentException(
+                                "When 'start.mode' is set to 
'CONSUME_FROM_SPECIFIC_OFFSETS' in tables_configs, "
+                                        + "'start.mode.offsets' must also be 
specified in the same table config entry. "
+                                        + "Topics: "
+                                        + topicsStr);
+                    }
+                    Map<MessageQueue, Long> specificOffsets = 
metadata.getSpecificStartOffsets();
+                    if (specificOffsets == null) {
+                        specificOffsets = new HashMap<>();
+                        metadata.setSpecificStartOffsets(specificOffsets);
+                    }
+                    for (Map.Entry<String, Long> entry : 
offsetConfigMap.entrySet()) {
+                        int splitIndex = entry.getKey().lastIndexOf("-");
+                        String topicName = entry.getKey().substring(0, 
splitIndex);
+                        int queueId = 
Integer.parseInt(entry.getKey().substring(splitIndex + 1));
+                        specificOffsets.put(
+                                new MessageQueue(topicName, null, queueId), 
entry.getValue());
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        for (String topic : topics) {
+            TopicTableConfig config = new TopicTableConfig();
+            config.setCatalogTable(catalogTable);
+            config.setDeserializationSchema(deserializationSchema);
+            config.setTags(tags);
+            config.setStartMode(startMode);
+            config.setStartTimestamp(startTimestamp);
+            topicConfigs.put(topic, config);
+            allTopics.add(topic);
+        }
+    }
+
+    private List<String> parseTags(ReadonlyConfig config) {
+        String tags = config.get(RocketMqSourceOptions.TAGS);
+        if (tags != null && !tags.trim().isEmpty()) {
+            return 
Arrays.stream(tags.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+                    .map(String::trim)
+                    .filter(tag -> !tag.isEmpty())
+                    .distinct()
+                    .collect(Collectors.toList());
+        }
+        return Collections.emptyList();
+    }
+
+    private ConsumerMetadata buildConsumerMetadata(
+            ReadonlyConfig readonlyConfig, List<String> topics) {
+        ConsumerMetadata consumerMetadata = new ConsumerMetadata();
+        consumerMetadata.setTopics(topics);
+        consumerMetadata.setTags(Collections.emptyList());
+
+        RocketMqBaseConfiguration.Builder baseConfigBuilder =
+                RocketMqBaseConfiguration.newBuilder()
+                        .consumer()
+                        
.namesrvAddr(readonlyConfig.get(RocketMqSourceOptions.NAME_SRV_ADDR));
+        if 
(readonlyConfig.getOptional(RocketMqSourceOptions.ACCESS_KEY).isPresent()) {
+            
baseConfigBuilder.accessKey(readonlyConfig.get(RocketMqSourceOptions.ACCESS_KEY));
+        }
+        if 
(readonlyConfig.getOptional(RocketMqSourceOptions.SECRET_KEY).isPresent()) {
+            
baseConfigBuilder.secretKey(readonlyConfig.get(RocketMqSourceOptions.SECRET_KEY));
+        }
+        
baseConfigBuilder.aclEnable(readonlyConfig.get(RocketMqSourceOptions.ACL_ENABLED));
+        
baseConfigBuilder.groupId(readonlyConfig.get(RocketMqSourceOptions.CONSUMER_GROUP));
+        
baseConfigBuilder.batchSize(readonlyConfig.get(RocketMqSourceOptions.BATCH_SIZE));
+        baseConfigBuilder.pollTimeoutMillis(
+                readonlyConfig.get(RocketMqSourceOptions.POLL_TIMEOUT_MILLIS));
+
+        consumerMetadata.setBaseConfig(baseConfigBuilder.build());
+        consumerMetadata.setEnabledCommitCheckpoint(
+                
readonlyConfig.get(RocketMqSourceOptions.COMMIT_ON_CHECKPOINT));
+
+        StartMode startMode = 
readonlyConfig.get(RocketMqSourceOptions.START_MODE);
+        switch (startMode) {
+            case CONSUME_FROM_TIMESTAMP:
+                long startOffsetsTimestamp =
+                        
readonlyConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
+                long currentTimestamp = System.currentTimeMillis();
+                if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > 
currentTimestamp) {
+                    throw new IllegalArgumentException(
+                            "The offsets timestamp value is smaller than 0 or 
larger"
+                                    + " than the current time");
+                }
+                
consumerMetadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+                break;
+            case CONSUME_FROM_SPECIFIC_OFFSETS:
+                Map<String, Long> offsetConfigMap =
+                        
readonlyConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
+                Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
+                offsetConfigMap.forEach(
+                        (k, v) -> {
+                            int splitIndex = k.lastIndexOf("-");
+                            String topic = k.substring(0, splitIndex);
+                            String partition = k.substring(splitIndex + 1);
+                            MessageQueue messageQueue =
+                                    new MessageQueue(topic, null, 
Integer.parseInt(partition));
+                            specificStartOffsets.put(messageQueue, v);
+                        });
+                consumerMetadata.setSpecificStartOffsets(specificStartOffsets);
+                break;
+            default:
+                break;
+        }
+        consumerMetadata.setStartMode(startMode);
+
+        return consumerMetadata;
+    }
+
+    private DeserializationSchema<SeaTunnelRow> buildDeserialization(
+            ReadonlyConfig config, CatalogTable catalogTable) {
+        DeserializationSchema<SeaTunnelRow> schema;
+        if (config.getOptional(RocketMqSourceOptions.SCHEMA).isPresent()) {
+            SchemaFormat format = config.get(RocketMqSourceOptions.FORMAT);
+            boolean ignoreParseErrors = 
config.get(RocketMqSourceOptions.IGNORE_PARSE_ERRORS);
+            switch (format) {
+                case JSON:
+                    schema = new JsonDeserializationSchema(catalogTable, 
false, ignoreParseErrors);
+                    break;
+                case TEXT:
+                    schema =
+                            TextDeserializationSchema.builder()
+                                    
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+                                    
.delimiter(config.get(RocketMqSourceOptions.FIELD_DELIMITER))
+                                    .setCatalogTable(catalogTable)
+                                    .build();
+                    break;
+                default:
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                            "Unsupported format: " + format);
+            }
+        } else {
+            schema =
+                    TextDeserializationSchema.builder()
+                            
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+                            .delimiter(String.valueOf('\002'))
+                            .setCatalogTable(catalogTable)
+                            .build();
+        }
+        String tableId = catalogTable.getTablePath().toString();
+        return new RocketMqTableIdDeserializationSchema(schema, tableId);
+    }
+
+    public List<CatalogTable> getCatalogTables() {
+        return topicConfigs.values().stream()
+                .map(TopicTableConfig::getCatalogTable)
+                .distinct()
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
index a6de8153f5..67ace88c08 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -42,7 +42,11 @@ public class RocketMqSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(RocketMqSourceOptions.TOPICS, 
RocketMqSourceOptions.NAME_SRV_ADDR)
+                .required(RocketMqSourceOptions.NAME_SRV_ADDR)
+                .exclusive(
+                        RocketMqSourceOptions.TOPICS,
+                        RocketMqSourceOptions.TABLE_CONFIGS,
+                        RocketMqSourceOptions.TABLE_LIST)
                 .optional(
                         RocketMqSourceOptions.FORMAT,
                         RocketMqSourceOptions.TAGS,
@@ -52,7 +56,8 @@ public class RocketMqSourceFactory implements 
TableSourceFactory {
                         RocketMqSourceOptions.SCHEMA,
                         
RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                         RocketMqSourceOptions.POLL_TIMEOUT_MILLIS,
-                        RocketMqSourceOptions.BATCH_SIZE)
+                        RocketMqSourceOptions.BATCH_SIZE,
+                        RocketMqSourceOptions.IGNORE_PARSE_ERRORS)
                 .conditional(
                         RocketMqSourceOptions.START_MODE,
                         StartMode.CONSUME_FROM_TIMESTAMP,
@@ -60,8 +65,7 @@ public class RocketMqSourceFactory implements 
TableSourceFactory {
                 .conditional(
                         RocketMqSourceOptions.START_MODE,
                         StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
-                        RocketMqSourceOptions.START_MODE_OFFSETS,
-                        RocketMqSourceOptions.IGNORE_PARSE_ERRORS)
+                        RocketMqSourceOptions.START_MODE_OFFSETS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 65a1400361..062b82394a 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Maps;
 
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -56,7 +55,7 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
     private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
     private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
     private final ExecutorService executorService;
-    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final Map<String, TopicTableConfig> topicConfigs;
 
     private final LinkedBlockingQueue<RocketMqSourceSplit> 
pendingPartitionsQueue;
 
@@ -64,12 +63,12 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
 
     public RocketMqSourceReader(
             ConsumerMetadata metadata,
-            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            Map<String, TopicTableConfig> topicConfigs,
             Context context) {
         this.metadata = metadata;
         this.context = context;
         this.sourceSplits = new HashSet<>();
-        this.deserializationSchema = deserializationSchema;
+        this.topicConfigs = topicConfigs;
         this.consumerThreads = new ConcurrentHashMap<>();
         this.checkpointOffsets = new ConcurrentHashMap<>();
         this.executorService =
@@ -146,18 +145,25 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
                                                                 
.collect(Collectors.toList());
                                                 long lastOffset = -1;
                                                 for (MessageExt record : 
messages) {
-                                                    // Check if the tags are 
specified and match the
-                                                    // record's tag
+                                                    TopicTableConfig 
topicConfig =
+                                                            
topicConfigs.get(record.getTopic());
+                                                    if (topicConfig == null) {
+                                                        throw new 
RocketMqConnectorException(
+                                                                
RocketMqConnectorErrorCode
+                                                                        
.CONSUME_DATA_FAILED,
+                                                                "No config 
found for topic: "
+                                                                        + 
record.getTopic());
+                                                    }
+                                                    List<String> tags = 
topicConfig.getTags();
                                                     boolean shouldProcess =
-                                                            metadata.getTags() 
== null
-                                                                    || 
metadata.getTags().isEmpty()
-                                                                    || 
metadata.getTags()
-                                                                            
.contains(
-                                                                               
     record
-                                                                               
             .getTags());
+                                                            tags.isEmpty()
+                                                                    || 
tags.contains(
+                                                                            
record.getTags());
                                                     if (shouldProcess) {
-                                                        
deserializationSchema.deserialize(
-                                                                
record.getBody(), output);
+                                                        topicConfig
+                                                                
.getDeserializationSchema()
+                                                                .deserialize(
+                                                                        
record.getBody(), output);
                                                         lastOffset = 
record.getQueueOffset();
                                                     }
                                                     if 
(Boundedness.BOUNDED.equals(
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 18d60dbba9..91fa6e8a07 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.Sets;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.config.Common;
 import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
 import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
 
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -54,6 +56,7 @@ public class RocketMqSourceSplitEnumerator
     private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
     private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
     private final ConsumerMetadata metadata;
+    private final Map<String, TopicTableConfig> topicConfigs;
     private final Context<RocketMqSourceSplit> context;
     private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
     private ScheduledExecutorService executor;
@@ -63,8 +66,11 @@ public class RocketMqSourceSplitEnumerator
     private long discoveryIntervalMillis;
 
     public RocketMqSourceSplitEnumerator(
-            ConsumerMetadata metadata, 
SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+            ConsumerMetadata metadata,
+            Map<String, TopicTableConfig> topicConfigs,
+            SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
         this.metadata = metadata;
+        this.topicConfigs = topicConfigs;
         this.context = context;
         this.assignedSplit = new HashMap<>();
         this.pendingSplit = new HashMap<>();
@@ -75,9 +81,10 @@ public class RocketMqSourceSplitEnumerator
 
     public RocketMqSourceSplitEnumerator(
             ConsumerMetadata metadata,
+            Map<String, TopicTableConfig> topicConfigs,
             SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
             long discoveryIntervalMillis) {
-        this(metadata, context);
+        this(metadata, topicConfigs, context);
         this.discoveryIntervalMillis = discoveryIntervalMillis;
     }
 
@@ -242,68 +249,94 @@ public class RocketMqSourceSplitEnumerator
         return sourceSplits;
     }
 
-    private void setPartitionStartOffset() throws MQClientException {
-        Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
-        Map<MessageQueue, Long> topicPartitionOffsets = null;
-        switch (metadata.getStartMode()) {
-            case CONSUME_FROM_FIRST_OFFSET:
-                topicPartitionOffsets =
-                        listOffsets(topicPartitions, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-                break;
-            case CONSUME_FROM_LAST_OFFSET:
-                topicPartitionOffsets =
-                        listOffsets(topicPartitions, 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-                break;
-            case CONSUME_FROM_TIMESTAMP:
-                topicPartitionOffsets =
-                        listOffsets(topicPartitions, 
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
-                break;
-            case CONSUME_FROM_GROUP_OFFSETS:
-                topicPartitionOffsets = 
listConsumerGroupOffsets(topicPartitions);
-                if (topicPartitionOffsets.isEmpty()) {
-                    topicPartitionOffsets =
-                            listOffsets(
-                                    topicPartitions, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-                }
-                break;
-            case CONSUME_FROM_SPECIFIC_OFFSETS:
-                topicPartitionOffsets = metadata.getSpecificStartOffsets();
-                // Fill in broker name
-                setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
-                break;
-            default:
-                throw new RocketMqConnectorException(
-                        
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
-                        metadata.getStartMode().name());
+    private StartMode getEffectiveStartMode(String topic) {
+        TopicTableConfig config = topicConfigs.get(topic);
+        if (config != null && config.getStartMode() != null) {
+            return config.getStartMode();
         }
-        topicPartitionOffsets
-                .entrySet()
-                .forEach(
-                        entry -> {
-                            if (pendingSplit.containsKey(entry.getKey())) {
-                                
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
-                            }
-                        });
+        return metadata.getStartMode();
     }
 
-    private void setMessageQueueBroker(
-            Collection<MessageQueue> topicPartitions,
-            Map<MessageQueue, Long> topicPartitionOffsets) {
-        Map<String, String> flatTopicPartitions =
-                topicPartitions.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        messageQueue ->
-                                                messageQueue.getTopic()
-                                                        + "-"
-                                                        + 
messageQueue.getQueueId(),
-                                        MessageQueue::getBrokerName));
-        for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
-            String key = messageQueue.getTopic() + "-" + 
messageQueue.getQueueId();
-            if (flatTopicPartitions.containsKey(key)) {
-                messageQueue.setBrokerName(flatTopicPartitions.get(key));
+    private void setPartitionStartOffset() throws MQClientException {
+        // Group pending partitions by their effective start mode (per-topic 
override or global).
+        Map<StartMode, List<MessageQueue>> partitionsByMode = new 
LinkedHashMap<>();
+        for (MessageQueue mq : pendingSplit.keySet()) {
+            StartMode effectiveMode = getEffectiveStartMode(mq.getTopic());
+            partitionsByMode.computeIfAbsent(effectiveMode, k -> new 
ArrayList<>()).add(mq);
+        }
+
+        Map<MessageQueue, Long> topicPartitionOffsets = new HashMap<>();
+        for (Map.Entry<StartMode, List<MessageQueue>> entry : 
partitionsByMode.entrySet()) {
+            StartMode startMode = entry.getKey();
+            List<MessageQueue> queues = entry.getValue();
+            switch (startMode) {
+                case CONSUME_FROM_FIRST_OFFSET:
+                    topicPartitionOffsets.putAll(
+                            listOffsets(queues, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET));
+                    break;
+                case CONSUME_FROM_LAST_OFFSET:
+                    topicPartitionOffsets.putAll(
+                            listOffsets(queues, 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET));
+                    break;
+                case CONSUME_FROM_TIMESTAMP:
+                    // Group by effective timestamp (per-topic or global).
+                    Map<Long, List<MessageQueue>> queuesByTimestamp = new 
LinkedHashMap<>();
+                    for (MessageQueue mq : queues) {
+                        TopicTableConfig config = 
topicConfigs.get(mq.getTopic());
+                        Long ts =
+                                (config != null && config.getStartTimestamp() 
!= null)
+                                        ? config.getStartTimestamp()
+                                        : metadata.getStartOffsetsTimestamp();
+                        queuesByTimestamp.computeIfAbsent(ts, k -> new 
ArrayList<>()).add(mq);
+                    }
+                    for (Map.Entry<Long, List<MessageQueue>> tsEntry :
+                            queuesByTimestamp.entrySet()) {
+                        topicPartitionOffsets.putAll(
+                                RocketMqAdminUtil.searchOffsetsByTimestamp(
+                                        metadata.getBaseConfig(),
+                                        tsEntry.getValue(),
+                                        tsEntry.getKey()));
+                    }
+                    break;
+                case CONSUME_FROM_GROUP_OFFSETS:
+                    Map<MessageQueue, Long> groupOffsets = 
listConsumerGroupOffsets(queues);
+                    if (groupOffsets.isEmpty()) {
+                        topicPartitionOffsets.putAll(
+                                listOffsets(queues, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET));
+                    } else {
+                        topicPartitionOffsets.putAll(groupOffsets);
+                    }
+                    break;
+                case CONSUME_FROM_SPECIFIC_OFFSETS:
+                    Map<MessageQueue, Long> specificOffsets = 
metadata.getSpecificStartOffsets();
+                    if (specificOffsets != null) {
+                        Map<String, Long> offsetByKey = new HashMap<>();
+                        for (Map.Entry<MessageQueue, Long> e : 
specificOffsets.entrySet()) {
+                            offsetByKey.put(
+                                    e.getKey().getTopic() + "-" + 
e.getKey().getQueueId(),
+                                    e.getValue());
+                        }
+                        for (MessageQueue mq : queues) {
+                            Long offset = offsetByKey.get(mq.getTopic() + "-" 
+ mq.getQueueId());
+                            if (offset != null) {
+                                topicPartitionOffsets.put(mq, offset);
+                            }
+                        }
+                    }
+                    break;
+                default:
+                    throw new RocketMqConnectorException(
+                            
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
+                            startMode.name());
             }
         }
+
+        topicPartitionOffsets.forEach(
+                (mq, offset) -> {
+                    if (pendingSplit.containsKey(mq)) {
+                        pendingSplit.get(mq).setStartOffset(offset);
+                    }
+                });
     }
 
     private Map<MessageQueue, Long> listOffsets(
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
new file mode 100644
index 0000000000..22d3e9dcab
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+
+public class RocketMqTableIdDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final DeserializationSchema<SeaTunnelRow> delegate;
+    private final String tableId;
+
+    public RocketMqTableIdDeserializationSchema(
+            DeserializationSchema<SeaTunnelRow> delegate, String tableId) {
+        this.delegate = delegate;
+        this.tableId = tableId;
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        SeaTunnelRow row = delegate.deserialize(message);
+        if (row != null) {
+            row.setTableId(tableId);
+        }
+        return row;
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) 
throws IOException {
+        delegate.deserialize(
+                message,
+                new Collector<SeaTunnelRow>() {
+                    @Override
+                    public void collect(SeaTunnelRow record) {
+                        record.setTableId(tableId);
+                        out.collect(record);
+                    }
+
+                    @Override
+                    public void markSchemaChangeBeforeCheckpoint() {
+                        out.markSchemaChangeBeforeCheckpoint();
+                    }
+
+                    @Override
+                    public void collect(SchemaChangeEvent event) {
+                        out.collect(event);
+                    }
+
+                    @Override
+                    public void markSchemaChangeAfterCheckpoint() {
+                        out.markSchemaChangeAfterCheckpoint();
+                    }
+
+                    @Override
+                    public Object getCheckpointLock() {
+                        return out.getCheckpointLock();
+                    }
+
+                    @Override
+                    public boolean isEmptyThisPollNext() {
+                        return out.isEmptyThisPollNext();
+                    }
+
+                    @Override
+                    public void resetEmptyThisPollNext() {
+                        out.resetEmptyThisPollNext();
+                    }
+                });
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return delegate.getProducedType();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
new file mode 100644
index 0000000000..e0c77d61ad
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/TopicTableConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TopicTableConfig implements Serializable {
+    private CatalogTable catalogTable;
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private List<String> tags;
+    private StartMode startMode;
+    private Long startTimestamp;
+}
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
new file mode 100644
index 0000000000..612fe74ec4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfigTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class RocketMqSourceConfigTest {
+
+    private static Map<String, Object> baseConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("name.srv.addr", "localhost:9876");
+        return config;
+    }
+
+    private static Map<String, Object> schemaOf(String... nameTypePairs) {
+        Map<String, Object> fields = new LinkedHashMap<>();
+        for (int i = 0; i < nameTypePairs.length; i += 2) {
+            fields.put(nameTypePairs[i], nameTypePairs[i + 1]);
+        }
+        Map<String, Object> schema = new HashMap<>();
+        schema.put("fields", fields);
+        return schema;
+    }
+
+    private static Map<String, Object> tableEntry(String topics, String... 
nameTypePairs) {
+        Map<String, Object> entry = new HashMap<>();
+        entry.put("topics", topics);
+        entry.put("format", "json");
+        entry.put("schema", schemaOf(nameTypePairs));
+        return entry;
+    }
+
+    @Test
+    void testMultiTableMode_tablesConfigs_tableName() {
+        Map<String, Object> config = baseConfig();
+
+        // topic_a: no explicit table name, should fall back to topic name
+        Map<String, Object> entry1 = tableEntry("topic_a", "id", "bigint");
+
+        // topic_b: explicit table name
+        Map<String, Object> schemaWithTable = schemaOf("id", "bigint");
+        schemaWithTable.put("table", "my_custom_table");
+        Map<String, Object> entry2 = new HashMap<>();
+        entry2.put("topics", "topic_b");
+        entry2.put("format", "json");
+        entry2.put("schema", schemaWithTable);
+
+        config.put("tables_configs", Arrays.asList(entry1, entry2));
+
+        RocketMqSourceConfig sourceConfig =
+                new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+        CatalogTable tableA = 
sourceConfig.getTopicConfigs().get("topic_a").getCatalogTable();
+        CatalogTable tableB = 
sourceConfig.getTopicConfigs().get("topic_b").getCatalogTable();
+        assertEquals("topic_a", tableA.getTablePath().toString());
+        assertEquals("my_custom_table", tableB.getTablePath().toString());
+    }
+
+    @Test
+    void testMultiTableMode_perTableStartMode_consumeFromTimestamp() {
+        Map<String, Object> config = baseConfig();
+
+        long ts = System.currentTimeMillis() - 60_000;
+
+        Map<String, Object> entry = tableEntry("topic_ts", "id", "bigint");
+        entry.put("start.mode", "CONSUME_FROM_TIMESTAMP");
+        entry.put("start.mode.timestamp", ts);
+
+        List<Map<String, Object>> tablesConfigs = new ArrayList<>();
+        tablesConfigs.add(entry);
+        config.put("tables_configs", tablesConfigs);
+
+        RocketMqSourceConfig sourceConfig =
+                new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+        TopicTableConfig topicCfg = 
sourceConfig.getTopicConfigs().get("topic_ts");
+        assertEquals(StartMode.CONSUME_FROM_TIMESTAMP, 
topicCfg.getStartMode());
+        assertEquals(ts, topicCfg.getStartTimestamp());
+    }
+
+    @Test
+    void testMultiTableMode_specificOffsets_mergedAcrossTables() {
+        Map<String, Object> config = baseConfig();
+
+        Map<String, Long> offsets1 = new HashMap<>();
+        offsets1.put("topic_a-0", 100L);
+        offsets1.put("topic_a-1", 200L);
+        Map<String, Object> entry1 = tableEntry("topic_a", "id", "bigint");
+        entry1.put("start.mode", "CONSUME_FROM_SPECIFIC_OFFSETS");
+        entry1.put("start.mode.offsets", offsets1);
+
+        Map<String, Long> offsets2 = new HashMap<>();
+        offsets2.put("topic_b-0", 50L);
+        Map<String, Object> entry2 = tableEntry("topic_b", "id", "bigint");
+        entry2.put("start.mode", "CONSUME_FROM_SPECIFIC_OFFSETS");
+        entry2.put("start.mode.offsets", offsets2);
+
+        List<Map<String, Object>> tablesConfigs = new ArrayList<>();
+        tablesConfigs.add(entry1);
+        tablesConfigs.add(entry2);
+        config.put("tables_configs", tablesConfigs);
+
+        RocketMqSourceConfig sourceConfig =
+                new RocketMqSourceConfig(ReadonlyConfig.fromMap(config));
+
+        // Specific offsets from both tables should be merged into the same 
map in metadata
+        assertNotNull(sourceConfig.getMetadata().getSpecificStartOffsets());
+        assertEquals(3, 
sourceConfig.getMetadata().getSpecificStartOffsets().size());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..e10a70399d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/test/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumeratorTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+
+class RocketMqSourceSplitEnumeratorTest {
+
+    @Test
+    void testRun_usesPerTopicTimestampOverrides() throws Exception {
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setTopics(Arrays.asList("topic_a", "topic_b"));
+        metadata.setStartMode(StartMode.CONSUME_FROM_TIMESTAMP);
+        metadata.setStartOffsetsTimestamp(1_000L);
+
+        TopicTableConfig topicAConfig = new TopicTableConfig();
+        topicAConfig.setStartMode(StartMode.CONSUME_FROM_TIMESTAMP);
+        topicAConfig.setStartTimestamp(2_000L);
+
+        Map<String, TopicTableConfig> topicConfigs = new HashMap<>();
+        topicConfigs.put("topic_a", topicAConfig);
+
+        MessageQueue topicAQueue = new MessageQueue("topic_a", "broker-a", 0);
+        MessageQueue topicBQueue = new MessageQueue("topic_b", "broker-b", 0);
+
+        SourceSplitEnumerator.Context<RocketMqSourceSplit> context = 
mockContext();
+
+        try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+                Mockito.mockStatic(RocketMqAdminUtil.class)) {
+            mockedAdmin
+                    .when(() -> RocketMqAdminUtil.offsetTopics(any(), 
eq(metadata.getTopics())))
+                    .thenReturn(
+                            Collections.singletonList(
+                                    topicOffsets(
+                                            queueOffset(topicAQueue, 0L, 20L),
+                                            queueOffset(topicBQueue, 0L, 
10L))));
+            mockedAdmin
+                    .when(
+                            () ->
+                                    RocketMqAdminUtil.searchOffsetsByTimestamp(
+                                            any(),
+                                            
eq(Collections.singletonList(topicAQueue)),
+                                            eq(2_000L)))
+                    .thenReturn(Collections.singletonMap(topicAQueue, 12L));
+            mockedAdmin
+                    .when(
+                            () ->
+                                    RocketMqAdminUtil.searchOffsetsByTimestamp(
+                                            any(),
+                                            
eq(Collections.singletonList(topicBQueue)),
+                                            eq(1_000L)))
+                    .thenReturn(Collections.singletonMap(topicBQueue, 7L));
+
+            RocketMqSourceSplitEnumerator enumerator =
+                    new RocketMqSourceSplitEnumerator(metadata, topicConfigs, 
context, -1L);
+
+            enumerator.run();
+        }
+
+        Map<String, RocketMqSourceSplit> splitsByTopic = 
captureAssignedSplits(context);
+        Assertions.assertEquals(12L, 
splitsByTopic.get("topic_a").getStartOffset());
+        Assertions.assertEquals(7L, 
splitsByTopic.get("topic_b").getStartOffset());
+    }
+
+    @Test
+    void testRun_fallsBackToFirstOffsetWhenGroupOffsetsAreMissing() throws 
Exception {
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setTopics(Collections.singletonList("topic_group"));
+        metadata.setStartMode(StartMode.CONSUME_FROM_GROUP_OFFSETS);
+
+        MessageQueue messageQueue = new MessageQueue("topic_group", 
"broker-group", 0);
+
+        SourceSplitEnumerator.Context<RocketMqSourceSplit> context = 
mockContext();
+
+        try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+                Mockito.mockStatic(RocketMqAdminUtil.class)) {
+            mockedAdmin
+                    .when(() -> RocketMqAdminUtil.offsetTopics(any(), 
eq(metadata.getTopics())))
+                    .thenReturn(
+                            Collections.singletonList(
+                                    topicOffsets(queueOffset(messageQueue, 5L, 
18L))));
+            mockedAdmin
+                    .when(
+                            () ->
+                                    RocketMqAdminUtil.currentOffsets(
+                                            any(),
+                                            eq(metadata.getTopics()),
+                                            
eq(Collections.singleton(messageQueue))))
+                    .thenReturn(Collections.emptyMap());
+            mockedAdmin
+                    .when(() -> RocketMqAdminUtil.flatOffsetTopics(any(), 
eq(metadata.getTopics())))
+                    .thenReturn(topicOffsets(queueOffset(messageQueue, 5L, 
18L)));
+
+            RocketMqSourceSplitEnumerator enumerator =
+                    new RocketMqSourceSplitEnumerator(
+                            metadata, Collections.emptyMap(), context, -1L);
+
+            enumerator.run();
+        }
+
+        Map<String, RocketMqSourceSplit> splitsByTopic = 
captureAssignedSplits(context);
+        Assertions.assertEquals(5L, 
splitsByTopic.get("topic_group").getStartOffset());
+    }
+
+    @Test
+    void testRun_matchesSpecificOffsetsByTopicAndQueueId() throws Exception {
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setTopics(Collections.singletonList("topic_specific"));
+        metadata.setStartMode(StartMode.CONSUME_FROM_SPECIFIC_OFFSETS);
+        metadata.setSpecificStartOffsets(
+                Collections.singletonMap(new MessageQueue("topic_specific", 
null, 0), 33L));
+
+        MessageQueue messageQueue = new MessageQueue("topic_specific", 
"broker-specific", 0);
+
+        SourceSplitEnumerator.Context<RocketMqSourceSplit> context = 
mockContext();
+
+        try (MockedStatic<RocketMqAdminUtil> mockedAdmin =
+                Mockito.mockStatic(RocketMqAdminUtil.class)) {
+            mockedAdmin
+                    .when(() -> RocketMqAdminUtil.offsetTopics(any(), 
eq(metadata.getTopics())))
+                    .thenReturn(
+                            Collections.singletonList(
+                                    topicOffsets(queueOffset(messageQueue, 0L, 
50L))));
+
+            RocketMqSourceSplitEnumerator enumerator =
+                    new RocketMqSourceSplitEnumerator(
+                            metadata, Collections.emptyMap(), context, -1L);
+
+            enumerator.run();
+        }
+
+        Map<String, RocketMqSourceSplit> splitsByTopic = 
captureAssignedSplits(context);
+        Assertions.assertEquals(33L, 
splitsByTopic.get("topic_specific").getStartOffset());
+    }
+
+    private SourceSplitEnumerator.Context<RocketMqSourceSplit> mockContext() {
+        SourceSplitEnumerator.Context<RocketMqSourceSplit> context =
+                Mockito.mock(SourceSplitEnumerator.Context.class);
+        Mockito.when(context.currentParallelism()).thenReturn(1);
+        return context;
+    }
+
+    private Map<String, RocketMqSourceSplit> captureAssignedSplits(
+            SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+        ArgumentCaptor<List> splitsCaptor = 
ArgumentCaptor.forClass(List.class);
+        Mockito.verify(context).assignSplit(eq(0), splitsCaptor.capture());
+
+        List<RocketMqSourceSplit> splits = splitsCaptor.getValue();
+        Assertions.assertEquals(1, splitsCaptor.getAllValues().size());
+        return splits.stream()
+                .collect(
+                        Collectors.toMap(
+                                split -> split.getMessageQueue().getTopic(), 
split -> split));
+    }
+
+    private Map<MessageQueue, TopicOffset> topicOffsets(
+            Map.Entry<MessageQueue, TopicOffset>... entries) {
+        Map<MessageQueue, TopicOffset> offsets = new LinkedHashMap<>();
+        for (Map.Entry<MessageQueue, TopicOffset> entry : entries) {
+            offsets.put(entry.getKey(), entry.getValue());
+        }
+        return offsets;
+    }
+
+    private Map.Entry<MessageQueue, TopicOffset> queueOffset(
+            MessageQueue messageQueue, long minOffset, long maxOffset) {
+        TopicOffset topicOffset = new TopicOffset();
+        topicOffset.setMinOffset(minOffset);
+        topicOffset.setMaxOffset(maxOffset);
+        return new AbstractMap.SimpleImmutableEntry<>(messageQueue, 
topicOffset);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index e59237e73d..1023abd529 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -298,6 +298,50 @@ public class RocketMqIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "The multi-catalog does not currently support the 
Spark Flink engine")
+    @TestTemplate
+    public void testSourceRocketMqMultiTableToAssert(TestContainer container)
+            throws IOException, InterruptedException {
+        String topicA = "test_topic_multi_a";
+        String topicB = "test_topic_multi_b";
+
+        // topicA: 5 messages without tag (ids 0-4).
+        // The conf sets global start.mode=CONSUME_FROM_LAST_OFFSET but 
overrides topicA to
+        // CONSUME_FROM_FIRST_OFFSET, so all 5 pre-written messages must be 
consumed.
+        DefaultSeaTunnelRowSerializer serializerA =
+                new DefaultSeaTunnelRowSerializer(
+                        topicA, null, SEATUNNEL_ROW_TYPE, DEFAULT_FORMAT, 
DEFAULT_FIELD_DELIMITER);
+        generateTestData(serializerA::serializeRow, topicA, 0, 5);
+
+        // topicB: 3 messages with "tag_b" (ids 100-102) + 4 messages with 
"other_tag" (ids
+        // 103-106).
+        // The conf overrides topicB to CONSUME_FROM_FIRST_OFFSET and filters 
by tags="tag_b",
+        // so exactly 3 messages must be consumed; other_tag messages are 
dropped.
+        DefaultSeaTunnelRowSerializer serializerB =
+                new DefaultSeaTunnelRowSerializer(
+                        topicB,
+                        "tag_b",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        DefaultSeaTunnelRowSerializer serializerBOther =
+                new DefaultSeaTunnelRowSerializer(
+                        topicB,
+                        "other_tag",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(serializerB::serializeRow, topicB, 100, 103);
+        generateTestData(serializerBOther::serializeRow, topicB, 103, 107);
+
+        Container.ExecResult execResult =
+                
container.executeJob("/multiTableIT/rocketmq_multi_source_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testRocketMqLatestToConsole(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
new file mode 100644
index 0000000000..f018b404a2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/multiTableIT/rocketmq_multi_source_to_assert.conf
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    start.mode = "CONSUME_FROM_LAST_OFFSET"
+    tables_configs = [
+      {
+        topics = "test_topic_multi_a"
+        start.mode = "CONSUME_FROM_FIRST_OFFSET"
+        format = json
+        schema = {
+          fields {
+            id = bigint
+            c_map = "map<string, smallint>"
+            c_array = "array<tinyint>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(2, 1)"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
+        }
+      },
+      {
+        topics = "test_topic_multi_b"
+        start.mode = "CONSUME_FROM_FIRST_OFFSET"
+        tags = "tag_b"
+        format = json
+        schema = {
+          table = "rocketmq_multi_custom"
+          fields {
+            id = bigint
+            c_map = "map<string, smallint>"
+            c_array = "array<tinyint>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(2, 1)"
+            c_bytes = bytes
+            c_date = date
+            c_timestamp = timestamp
+          }
+        }
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules = {
+      tables_configs = [
+        {
+          table_path = "test_topic_multi_a"
+          row_rules = [
+            {rule_type = MIN_ROW, rule_value = 5},
+            {rule_type = MAX_ROW, rule_value = 5}
+          ]
+          field_rules = [
+            {
+              field_name = id
+              field_type = bigint
+              field_value = [
+                {rule_type = NOT_NULL},
+                {rule_type = MIN, rule_value = 0},
+                {rule_type = MAX, rule_value = 4}
+              ]
+            },
+            {
+              field_name = c_string
+              field_type = string
+              field_value = [{rule_type = NOT_NULL}]
+            }
+          ]
+        },
+        {
+          table_path = "rocketmq_multi_custom"
+          row_rules = [
+            {rule_type = MIN_ROW, rule_value = 3},
+            {rule_type = MAX_ROW, rule_value = 3}
+          ]
+          field_rules = [
+            {
+              field_name = id
+              field_type = bigint
+              field_value = [
+                {rule_type = NOT_NULL},
+                {rule_type = MIN, rule_value = 100},
+                {rule_type = MAX, rule_value = 102}
+              ]
+            },
+            {
+              field_name = c_string
+              field_type = string
+              field_value = [{rule_type = NOT_NULL}]
+            }
+          ]
+        }
+      ]
+    }
+  }
+}

Reply via email to