Copilot commented on code in PR #10619:
URL: https://github.com/apache/seatunnel/pull/10619#discussion_r2963664223
##########
docs/zh/connectors/source/RocketMQ.md:
##########
@@ -31,7 +31,9 @@ Apache RocketMQ 的源连接器。
| 参数名 | 类型 | 必须 | 默认值
| 描述
|
|-------------------------------------|---------|----|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics | String | 是 | -
| RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。
|
+| topics | String | 是 | -
| RocketMQ 主题名称。如果有多个主题,使用 `,` 分隔,例如:`"tpc1,tpc2"`。`topics` 与
`tables_configs` 同时只能配置一个。
|
+| tables_configs | List | 否 | -
|
多表模式配置列表。每项配置一张表,支持:`topics`、`format`、`schema`、`tags`、`start.mode`、`start.mode.timestamp`、`start.mode.offsets`、`ignore_parse_errors`。`topics`
与 `tables_configs` 同时只能配置一个。 |
+| table_list | List | 否 | -
| 已废弃,请使用 `tables_configs` 代替。
|
Review Comment:
参数表中 `topics` 仍标记为“必须=是”,但当前实现已支持多表模式(使用 `tables_configs` / `table_list`
时不需要配置 `topics`,且三者互斥)。请调整“必须”列的描述,体现需要在 `topics` / `tables_configs` /
`table_list` 三者中至少配置一个,而不是固定要求 `topics`。
```suggestion
| topics | String | 否 | -
| RocketMQ 主题名称。如果有多个主题,使用 `,`
分隔,例如:`"tpc1,tpc2"`。`topics`、`tables_configs`、`table_list` 三者互斥,且至少配置一个;当未配置
`tables_configs` 与 `table_list` 时必须配置 `topics`。 |
| tables_configs | List | 否 | -
|
多表模式配置列表。每项配置一张表,支持:`topics`、`format`、`schema`、`tags`、`start.mode`、`start.mode.timestamp`、`start.mode.offsets`、`ignore_parse_errors`。`topics`、`tables_configs`、`table_list`
三者互斥,且至少配置一个。 |
| table_list | List | 否 | -
| 已废弃,请使用 `tables_configs` 代替。`topics`、`tables_configs`、`table_list`
三者互斥,且至少配置一个。
|
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** RocketMQ source configuration, supports both single-table and multi-table
modes. */
+public class RocketMqSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final ConsumerMetadata metadata;
+ @Getter private final Map<String, TopicTableConfig> topicConfigs;
+ @Getter private final long discoveryIntervalMillis;
+
+ public RocketMqSourceConfig(ReadonlyConfig readonlyConfig) {
+ this.topicConfigs = new LinkedHashMap<>();
+ this.discoveryIntervalMillis =
+
readonlyConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+
+ List<Map<String, Object>> tableConfigList = null;
+ if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_CONFIGS).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_CONFIGS);
+ } else if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_LIST).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_LIST);
+ }
+
+ if (tableConfigList != null) {
+ // Multi-table mode
+ this.metadata = buildConsumerMetadata(readonlyConfig,
Collections.emptyList());
+ List<String> allTopics = new ArrayList<>();
+ for (Map<String, Object> tableConfig : tableConfigList) {
+ parseTableConfig(ReadonlyConfig.fromMap(tableConfig),
allTopics);
+ }
+ this.metadata.setTopics(allTopics);
+ } else {
+ // Single-table mode (backward compatible)
+ List<String> topics =
+ Arrays.stream(
+ readonlyConfig
+ .get(RocketMqSourceOptions.TOPICS)
+
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
+ this.metadata = buildConsumerMetadata(readonlyConfig, topics);
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ buildDeserialization(readonlyConfig, catalogTable);
+ List<String> tags = parseTags(readonlyConfig);
+ for (String topic : topics) {
+ TopicTableConfig config = new TopicTableConfig();
+ config.setCatalogTable(catalogTable);
+ config.setDeserializationSchema(deserializationSchema);
+ config.setTags(tags);
+ topicConfigs.put(topic, config);
+ }
+ }
+ }
+
+ private void parseTableConfig(ReadonlyConfig tableConfig, List<String>
allTopics) {
+ String topicsStr = tableConfig.get(RocketMqSourceOptions.TOPICS);
+ List<String> topics =
+
Arrays.stream(topicsStr.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
Review Comment:
In multi-table mode, `parseTableConfig` assumes `topics` is present and
non-empty. If `topics` is missing/blank in a `tables_configs` entry,
`topicsStr.split(...)` and later `topics.get(0)` will throw
(NPE/IndexOutOfBounds) with a hard-to-understand message. Please add explicit
validation (e.g., require non-null/non-blank and at least one topic after
splitting) and fail fast with a clear exception indicating which table config
entry is invalid.
```suggestion
String topicsStr = tableConfig.get(RocketMqSourceOptions.TOPICS);
if (topicsStr == null || topicsStr.trim().isEmpty()) {
throw new IllegalArgumentException(
"Invalid RocketMQ table config in multi-table mode:
'topics' must be non-null and non-empty. Config: "
+ tableConfig);
}
List<String> topics =
Arrays.stream(topicsStr.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
.map(String::trim)
.filter(t -> !t.isEmpty())
.collect(Collectors.toList());
if (topics.isEmpty()) {
throw new IllegalArgumentException(
"Invalid RocketMQ table config in multi-table mode:
'topics' must contain at least one valid topic. Config: "
+ tableConfig);
}
```
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java:
##########
@@ -298,6 +298,50 @@ public void testSourceRocketMqJsonToConsole(TestContainer
container)
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "The multi-catalog does not currently support the
Spark Flink engine")
Review Comment:
The disabled reason says "Spark Flink engine" which is
confusing/grammatically incorrect given `type = {EngineType.SPARK,
EngineType.FLINK}`. Please reword to something like “Spark/Flink engines” (or
explicitly list both) to avoid ambiguity.
```suggestion
disabledReason = "The multi-catalog does not currently support
the Spark and Flink engines")
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceConfig.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.RocketMqSourceOptions;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** RocketMQ source configuration, supports both single-table and multi-table
modes. */
+public class RocketMqSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final ConsumerMetadata metadata;
+ @Getter private final Map<String, TopicTableConfig> topicConfigs;
+ @Getter private final long discoveryIntervalMillis;
+
+ public RocketMqSourceConfig(ReadonlyConfig readonlyConfig) {
+ this.topicConfigs = new LinkedHashMap<>();
+ this.discoveryIntervalMillis =
+
readonlyConfig.get(RocketMqSourceOptions.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS);
+
+ List<Map<String, Object>> tableConfigList = null;
+ if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_CONFIGS).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_CONFIGS);
+ } else if
(readonlyConfig.getOptional(RocketMqSourceOptions.TABLE_LIST).isPresent()) {
+ tableConfigList =
readonlyConfig.get(RocketMqSourceOptions.TABLE_LIST);
+ }
+
+ if (tableConfigList != null) {
+ // Multi-table mode
+ this.metadata = buildConsumerMetadata(readonlyConfig,
Collections.emptyList());
+ List<String> allTopics = new ArrayList<>();
+ for (Map<String, Object> tableConfig : tableConfigList) {
+ parseTableConfig(ReadonlyConfig.fromMap(tableConfig),
allTopics);
+ }
+ this.metadata.setTopics(allTopics);
+ } else {
+ // Single-table mode (backward compatible)
+ List<String> topics =
+ Arrays.stream(
+ readonlyConfig
+ .get(RocketMqSourceOptions.TOPICS)
+
.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
+ this.metadata = buildConsumerMetadata(readonlyConfig, topics);
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ buildDeserialization(readonlyConfig, catalogTable);
+ List<String> tags = parseTags(readonlyConfig);
+ for (String topic : topics) {
+ TopicTableConfig config = new TopicTableConfig();
+ config.setCatalogTable(catalogTable);
+ config.setDeserializationSchema(deserializationSchema);
+ config.setTags(tags);
+ topicConfigs.put(topic, config);
+ }
+ }
+ }
+
+ private void parseTableConfig(ReadonlyConfig tableConfig, List<String>
allTopics) {
+ String topicsStr = tableConfig.get(RocketMqSourceOptions.TOPICS);
+ List<String> topics =
+
Arrays.stream(topicsStr.split(RocketMqSourceOptions.DEFAULT_FIELD_DELIMITER))
+ .map(String::trim)
+ .filter(t -> !t.isEmpty())
+ .collect(Collectors.toList());
+
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(tableConfig);
+ if (TablePath.DEFAULT.equals(catalogTable.getTablePath())) {
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("", TablePath.of(null, null,
topics.get(0))),
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment());
+ }
+ DeserializationSchema<SeaTunnelRow> deserializationSchema =
+ buildDeserialization(tableConfig, catalogTable);
+ List<String> tags = parseTags(tableConfig);
+ StartMode startMode =
+
tableConfig.getOptional(RocketMqSourceOptions.START_MODE).orElse(null);
+ Long startTimestamp = null;
+
+ if (startMode != null) {
+ switch (startMode) {
+ case CONSUME_FROM_TIMESTAMP:
+ startTimestamp =
tableConfig.get(RocketMqSourceOptions.START_MODE_TIMESTAMP);
+ long currentTimestamp = System.currentTimeMillis();
+ if (startTimestamp < 0 || startTimestamp >
currentTimestamp) {
+ throw new IllegalArgumentException(
+ "The offsets timestamp value is smaller than 0
or larger"
+ + " than the current time");
+ }
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ Map<String, Long> offsetConfigMap =
+
tableConfig.get(RocketMqSourceOptions.START_MODE_OFFSETS);
+ Map<MessageQueue, Long> specificOffsets =
metadata.getSpecificStartOffsets();
+ if (specificOffsets == null) {
+ specificOffsets = new HashMap<>();
+ metadata.setSpecificStartOffsets(specificOffsets);
+ }
+ for (Map.Entry<String, Long> entry :
offsetConfigMap.entrySet()) {
+ int splitIndex = entry.getKey().lastIndexOf("-");
+ String topicName = entry.getKey().substring(0,
splitIndex);
+ int queueId =
Integer.parseInt(entry.getKey().substring(splitIndex + 1));
+ specificOffsets.put(
+ new MessageQueue(topicName, null, queueId),
entry.getValue());
+ }
+ break;
Review Comment:
Per-table `start.mode` handling can currently NPE: when `start.mode =
CONSUME_FROM_TIMESTAMP`, `start.mode.timestamp` is fetched via `get(...)` (can
be null) and then compared (`startTimestamp < 0`). Similarly for
`CONSUME_FROM_SPECIFIC_OFFSETS`, `start.mode.offsets` may be null before
iterating. Since OptionRule validation does not apply inside `tables_configs`,
please add explicit checks that the required companion options are present and
throw a descriptive exception if they are missing.
##########
docs/en/connectors/source/RocketMQ.md:
##########
@@ -31,7 +31,9 @@ Source connector for Apache RocketMQ.
| Name | Type | Required | Default
| Description
|
|-------------------------------------|---------|----------|----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topics | String | yes | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`.
|
+| topics | String | yes | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`. You can configure only one of `topics` and
`tables_configs` at the same time. |
+| tables_configs | List | no | -
| Multi-table mode config list. Each item configures one table and
supports: `topics`, `format`, `schema`, `tags`, `start.mode`,
`start.mode.timestamp`, `start.mode.offsets`, `ignore_parse_errors`. You can
configure only one of `topics` and `tables_configs` at the same time. |
+| table_list | List | no | -
| Deprecated, use `tables_configs` instead.
|
Review Comment:
The options table still marks `topics` as **Required = yes**, but the
connector now supports multi-table mode where `topics` is omitted in favor of
`tables_configs` / `table_list` (and the factory enforces mutual exclusivity).
Please update the “Required” column to reflect that `topics` is not always
required (e.g., indicate that one of `topics`/`tables_configs`/`table_list`
must be provided).
```suggestion
| topics | String | one of | -
| `RocketMQ topic` name. If there are multiple `topics`, use `,` to
split, for example: `"tpc1,tpc2"`. Exactly one of `topics`, `tables_configs`,
or `table_list` must be configured, and only one of these options can be used
at the same time. |
| tables_configs | List | one of | -
| Multi-table mode config list. Each item configures one table and
supports: `topics`, `format`, `schema`, `tags`, `start.mode`,
`start.mode.timestamp`, `start.mode.offsets`, `ignore_parse_errors`. Exactly
one of `topics`, `tables_configs`, or `table_list` must be configured, and only
one of these options can be used at the same time. |
| table_list | List | one of | -
| Deprecated, use `tables_configs` instead. Exactly one of `topics`,
`tables_configs`, or `table_list` must be configured, and only one of these
options can be used at the same time.
|
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqTableIdDeserializationSchema.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.io.IOException;
+
+public class RocketMqTableIdDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DeserializationSchema<SeaTunnelRow> delegate;
+ private final String tableId;
+
+ public RocketMqTableIdDeserializationSchema(
+ DeserializationSchema<SeaTunnelRow> delegate, String tableId) {
+ this.delegate = delegate;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ SeaTunnelRow row = delegate.deserialize(message);
+ if (row != null) {
+ row.setTableId(tableId);
+ }
+ return row;
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<SeaTunnelRow> out)
throws IOException {
+ delegate.deserialize(
+ message,
+ new Collector<SeaTunnelRow>() {
+ @Override
+ public void collect(SeaTunnelRow record) {
+ record.setTableId(tableId);
+ out.collect(record);
Review Comment:
`Collector.collect(SeaTunnelRow record)` unconditionally calls
`record.setTableId(...)`. `DeserializationSchema.deserialize(byte[],
Collector)` implementations are allowed to emit no record (or potentially emit
null), which would cause an NPE here. Please guard against `record == null`
before setting tableId (and ideally just skip forwarding nulls, consistent with
the default `DeserializationSchema` implementation).
```suggestion
if (record != null) {
record.setTableId(tableId);
out.collect(record);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]