This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0d551e346d [Feature][Connector-V2] Add MQTT source connector (#10935)
0d551e346d is described below
commit 0d551e346d41e3e3e4e371d951ec857ebea9b959
Author: marmong <[email protected]>
AuthorDate: Mon Jun 8 23:33:23 2026 +0900
[Feature][Connector-V2] Add MQTT source connector (#10935)
---
docs/en/connectors/source/Mqtt.md | 190 +++++++++++++
docs/zh/connectors/source/Mqtt.md | 190 +++++++++++++
plugin-mapping.properties | 1 +
.../mqtt/exception/MqttConnectorErrorCode.java | 3 +-
.../seatunnel/mqtt/source/MqttSource.java | 79 ++++++
.../seatunnel/mqtt/source/MqttSourceConfig.java | 141 ++++++++++
.../seatunnel/mqtt/source/MqttSourceFactory.java | 73 +++++
.../seatunnel/mqtt/source/MqttSourceOptions.java | 108 ++++++++
.../seatunnel/mqtt/source/MqttSourceReader.java | 304 +++++++++++++++++++++
.../mqtt/source/MqttSourceConfigTest.java | 162 +++++++++++
.../mqtt/source/MqttSourceFactoryTest.java | 65 +++++
.../seatunnel/mqtt/source/MqttSourceTest.java | 232 ++++++++++++++++
12 files changed, 1547 insertions(+), 1 deletion(-)
diff --git a/docs/en/connectors/source/Mqtt.md
b/docs/en/connectors/source/Mqtt.md
new file mode 100644
index 0000000000..d810978b74
--- /dev/null
+++ b/docs/en/connectors/source/Mqtt.md
@@ -0,0 +1,190 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT source connector
+
+## Description
+
+Used to read messages from an MQTT broker. Supports MQTT 3.1.1 protocol via
the Eclipse Paho client library.
+
+This connector subscribes to a configured MQTT topic, deserializes message
payloads as JSON or text, and converts them into SeaTunnel rows.
+
+## Key features
+
+- [ ] [batch](../../introduction/concepts/connector-v2-features.md)
+- [x] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support user-defined
split](../../introduction/concepts/connector-v2-features.md)
+
+:::caution Delivery semantics
+
+The `qos` option controls MQTT broker-client delivery only. It is not
integrated with SeaTunnel checkpointing, so this source does not provide
end-to-end exactly-once or at-least-once guarantees.
+
+For persistent MQTT sessions, set `clean_session=false` and configure a stable
`client_id`. When `clean_session=false`, the source disconnects without
unsubscribing during close, so the broker can retain the subscription according
to MQTT session semantics.
+
+The source uses MQTT auto-reconnect. If the client remains disconnected longer
than `reconnect_timeout`, the source task fails to avoid a silent ingestion
stall.
+
+:::
+
+## Options
+
+| name | type | required | default value |
+|---------------------|---------|----------|---------------|
+| url | string | yes | - |
+| topic | string | yes | - |
+| schema | config | yes | - |
+| username | string | no | - |
+| password | string | no | - |
+| qos | int | no | 1 |
+| format | string | no | json |
+| field_delimiter | string | no | , |
+| client_id | string | no | - |
+| clean_session | boolean | no | true |
+| connection_timeout | int | no | 30 |
+| keep_alive_interval | int | no | 60 |
+| reconnect_timeout | int | no | 120 |
+| max_queue_size | int | no | 1000 |
+| common-options | | no | - |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to subscribe messages from.
+
+Example: `iot/sensors/temperature`
+
+### schema [config]
+
+The schema fields of upstream data. For more details, please refer to [Schema
Feature](../../introduction/concepts/schema-feature.md).
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level used when subscribing to the topic.
+
+This setting only controls delivery between the MQTT broker and the MQTT
client. It does not provide end-to-end delivery guarantees in SeaTunnel.
+
+Supported values:
+
+- `0` — MQTT QoS 0
+- `1` — MQTT QoS 1
+
+### format [string]
+
+The deserialization format for incoming messages. Supported values:
+
+- `json` — Deserialize each message as a JSON object (default)
+- `text` — Deserialize each message as delimited plain text (delimiter
controlled by `field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### client_id [string]
+
+The MQTT client id. If omitted while `clean_session=true`, the connector
generates a random client id.
+
+This option is required when `clean_session=false`, because persistent MQTT
sessions require a stable client id.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards previous session state. Suitable for stateless
operation.
+- `false` — Broker can retain session state, including subscriptions. Requires
a stable `client_id`.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### keep_alive_interval [int]
+
+The MQTT keep alive interval in seconds.
+
+### reconnect_timeout [int]
+
+Maximum seconds to wait for MQTT auto-reconnect before failing the source. If
the MQTT client remains disconnected longer than this timeout, `pollNext()`
fails the source task instead of silently waiting forever.
+
+### max_queue_size [int]
+
+Maximum number of MQTT messages buffered in memory before deserialization.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
+
+## Example
+
+### JSON source
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MQTT {
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ qos = 1
+ format = "json"
+ schema = {
+ fields {
+ id = bigint
+ name = string
+ temperature = double
+ }
+ }
+ plugin_output = "sensor_data"
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "sensor_data"
+ }
+}
+```
+
+### Persistent session source
+
+```hocon
+source {
+ MQTT {
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ client_id = "seatunnel-mqtt-source"
+ clean_session = false
+ qos = 1
+ format = "json"
+ schema = {
+ fields {
+ id = bigint
+ temperature = double
+ }
+ }
+ }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Mqtt.md
b/docs/zh/connectors/source/Mqtt.md
new file mode 100644
index 0000000000..b057c8f3b4
--- /dev/null
+++ b/docs/zh/connectors/source/Mqtt.md
@@ -0,0 +1,190 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT 源连接器
+
+## 描述
+
+用于从 MQTT broker 读取消息。该连接器通过 Eclipse Paho 客户端库支持 MQTT 3.1.1 协议。
+
+该连接器会订阅配置的 MQTT topic,将消息 payload 按 JSON 或 text 格式反序列化,并转换为 SeaTunnel Row。
+
+## 关键特性
+
+- [ ] [批](../../introduction/concepts/connector-v2-features.md)
+- [x] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [ ] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行性](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持用户自定义 split](../../introduction/concepts/connector-v2-features.md)
+
+:::caution 交付语义
+
+`qos` 选项只控制 MQTT broker 和 MQTT client 之间的交付语义。它没有与 SeaTunnel checkpoint 集成,因此该
Source 不提供端到端的精确一次或至少一次保证。
+
+如需使用 MQTT 持久会话,请设置 `clean_session=false` 并配置稳定的 `client_id`。当
`clean_session=false` 时,Source 在关闭时只断开连接,不会取消订阅,因此 broker 可以根据 MQTT 会话语义保留订阅。
+
+Source 使用 MQTT 自动重连。如果 client 断开连接的时间超过 `reconnect_timeout`,Source task
会失败,以避免静默停止摄取。
+
+:::
+
+## 选项
+
+| 参数名 | 类型 | 必须 | 默认值 |
+|-------------------|---------|----|------|
+| url | string | 是 | - |
+| topic | string | 是 | - |
+| schema | config | 是 | - |
+| username | string | 否 | - |
+| password | string | 否 | - |
+| qos | int | 否 | 1 |
+| format | string | 否 | json |
+| field_delimiter | string | 否 | , |
+| client_id | string | 否 | - |
+| clean_session | boolean | 否 | true |
+| connection_timeout | int | 否 | 30 |
+| keep_alive_interval | int | 否 | 60 |
+| reconnect_timeout | int | 否 | 120 |
+| max_queue_size | int | 否 | 1000 |
+| common-options | | 否 | - |
+
+### url [string]
+
+MQTT broker 连接 URL。必须包含协议、主机和端口。
+
+示例:`tcp://broker.example.com:1883`
+
+### topic [string]
+
+要订阅消息的 MQTT topic。
+
+示例:`iot/sensors/temperature`
+
+### schema [config]
+
+上游数据的 schema 字段。更多详情请参考 [Schema
特性](../../introduction/concepts/schema-feature.md)。
+
+### username [string]
+
+MQTT broker 认证用户名。匿名访问时可以不配置。
+
+### password [string]
+
+MQTT broker 认证密码。匿名访问时可以不配置。
+
+### qos [int]
+
+订阅 topic 时使用的 MQTT Quality of Service 等级。
+
+该设置只控制 MQTT broker 和 MQTT client 之间的交付,不提供 SeaTunnel 端到端交付保证。
+
+支持的值:
+
+- `0` — MQTT QoS 0
+- `1` — MQTT QoS 1
+
+### format [string]
+
+输入消息的反序列化格式。支持的值:
+
+- `json` — 将每条消息反序列化为 JSON 对象(默认)
+- `text` — 将每条消息按分隔符反序列化为纯文本(分隔符由 `field_delimiter` 控制)
+
+### field_delimiter [string]
+
+当 `format` 设置为 `text` 时使用的字段分隔符。默认值为 `,`。
+
+示例:`,`, `|`, `\t`
+
+### client_id [string]
+
+MQTT client id。当 `clean_session=true` 且未配置该选项时,连接器会生成随机 client id。
+
+当 `clean_session=false` 时必须配置该选项,因为 MQTT 持久会话需要稳定的 client id。
+
+### clean_session [boolean]
+
+是否使用 clean MQTT session。默认值为 `true`。
+
+- `true` — broker 丢弃之前的会话状态,适合无状态运行。
+- `false` — broker 可以保留会话状态,包括订阅信息。需要稳定的 `client_id`。
+
+### connection_timeout [int]
+
+MQTT 连接建立超时时间,单位为秒。
+
+### keep_alive_interval [int]
+
+MQTT keep alive 间隔,单位为秒。
+
+### reconnect_timeout [int]
+
+等待 MQTT 自动重连的最长时间,单位为秒。如果 MQTT client 断开连接的时间超过该超时时间,`pollNext()` 会使 Source
task 失败,避免无限期静默等待。
+
+### max_queue_size [int]
+
+反序列化之前在内存中缓存的 MQTT 消息最大数量。
+
+### common options
+
+源插件通用参数,请参考 [源通用选项](../common-options/source-common-options.md) 详见。
+
+## 示例
+
+### JSON Source
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ MQTT {
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ qos = 1
+ format = "json"
+ schema = {
+ fields {
+ id = bigint
+ name = string
+ temperature = double
+ }
+ }
+ plugin_output = "sensor_data"
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "sensor_data"
+ }
+}
+```
+
+### 持久会话 Source
+
+```hocon
+source {
+ MQTT {
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ client_id = "seatunnel-mqtt-source"
+ clean_session = false
+ qos = 1
+ format = "json"
+ schema = {
+ fields {
+ id = bigint
+ temperature = double
+ }
+ }
+ }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index f64fe5ddd1..fe181d223c 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -139,6 +139,7 @@ seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.sink.ActiveMQ = connector-activemq
+seatunnel.source.MQTT = connector-mqtt
seatunnel.sink.MQTT = connector-mqtt
seatunnel.source.Prometheus = connector-prometheus
seatunnel.sink.Prometheus = connector-prometheus
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
index f55089f7a7..dc8d76fd85 100644
---
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
@@ -22,7 +22,8 @@ import
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
- INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
+ INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration"),
+ RECEIVE_FAILED("MQTT-04", "MQTT message receive failed");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
new file mode 100644
index 0000000000..580b3119bc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import java.util.Collections;
+import java.util.List;
+
+public class MqttSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+ private final MqttSourceConfig sourceConfig;
+ private final CatalogTable catalogTable;
+ private JobContext jobContext;
+
+ public MqttSource(ReadonlyConfig pluginConfig) {
+ this.sourceConfig = new MqttSourceConfig(pluginConfig);
+ this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ if (jobContext != null &&
!JobMode.STREAMING.equals(jobContext.getJobMode())) {
+ throw new MqttConnectorException(
+ MqttConnectorErrorCode.INVALID_CONFIG,
+ String.format(
+ "PluginName: %s, Message: MQTT source only
supports streaming job mode",
+ getPluginName()));
+ }
+ return Boundedness.UNBOUNDED;
+ }
+
+ @Override
+ public String getPluginName() {
+ return MqttSourceOptions.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+ SingleSplitReaderContext readerContext) throws Exception {
+ return new MqttSourceReader(sourceConfig, catalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
new file mode 100644
index 0000000000..e100e9bf3a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.util.UUID;
+
+public class MqttSourceConfig {
+
+ private static final String CLIENT_ID_PREFIX = "seatunnel_mqtt_source_";
+
+ private final String url;
+ private final String topic;
+ private final String username;
+ private final String password;
+ private final int qos;
+ private final String format;
+ private final String fieldDelimiter;
+ private final String clientId;
+ private final boolean cleanSession;
+ private final int connectionTimeout;
+ private final int keepAliveInterval;
+ private final int reconnectTimeout;
+ private final int maxQueueSize;
+
+ public MqttSourceConfig(ReadonlyConfig config) {
+ this.url = config.get(MqttSourceOptions.URL);
+ this.topic = config.get(MqttSourceOptions.TOPIC);
+ this.username = config.get(MqttSourceOptions.USERNAME);
+ this.password = config.get(MqttSourceOptions.PASSWORD);
+ this.qos = config.get(MqttSourceOptions.QOS);
+ this.format = config.get(MqttSourceOptions.FORMAT);
+ this.fieldDelimiter = config.get(MqttSourceOptions.FIELD_DELIMITER);
+ this.cleanSession = config.get(MqttSourceOptions.CLEAN_SESSION);
+ this.connectionTimeout =
config.get(MqttSourceOptions.CONNECTION_TIMEOUT);
+ this.keepAliveInterval =
config.get(MqttSourceOptions.KEEP_ALIVE_INTERVAL);
+ this.reconnectTimeout =
config.get(MqttSourceOptions.RECONNECT_TIMEOUT);
+ this.maxQueueSize = config.get(MqttSourceOptions.MAX_QUEUE_SIZE);
+
+ String configuredClientId = config.get(MqttSourceOptions.CLIENT_ID);
+ if (!cleanSession && isBlank(configuredClientId)) {
+ throw new IllegalArgumentException(
+ "client_id is required when clean_session=false for MQTT
source");
+ }
+ this.clientId =
+ isBlank(configuredClientId)
+ ? CLIENT_ID_PREFIX + UUID.randomUUID().toString()
+ : configuredClientId;
+
+ validate();
+ }
+
+ private void validate() {
+ if (qos < 0 || qos > 1) {
+ throw new IllegalArgumentException("MQTT source qos must be 0 or
1, got: " + qos);
+ }
+ if (!"json".equalsIgnoreCase(format) &&
!"text".equalsIgnoreCase(format)) {
+ throw new IllegalArgumentException("Unsupported MQTT source
format: " + format);
+ }
+ if (reconnectTimeout <= 0) {
+ throw new IllegalArgumentException(
+ "reconnect_timeout must be greater than 0, got: " +
reconnectTimeout);
+ }
+ if (maxQueueSize <= 0) {
+ throw new IllegalArgumentException(
+ "max_queue_size must be greater than 0, got: " +
maxQueueSize);
+ }
+ }
+
+ private static boolean isBlank(String value) {
+ return value == null || value.trim().isEmpty();
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public int getQos() {
+ return qos;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public String getFieldDelimiter() {
+ return fieldDelimiter;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public boolean isCleanSession() {
+ return cleanSession;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public int getKeepAliveInterval() {
+ return keepAliveInterval;
+ }
+
+ public int getReconnectTimeout() {
+ return reconnectTimeout;
+ }
+
+ public int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
new file mode 100644
index 0000000000..8aca1f2bcc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class MqttSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return MqttSourceOptions.CONNECTOR_IDENTITY;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ MqttSourceOptions.URL,
+ MqttSourceOptions.TOPIC,
+ ConnectorCommonOptions.SCHEMA)
+ .optional(
+ MqttSourceOptions.USERNAME,
+ MqttSourceOptions.PASSWORD,
+ MqttSourceOptions.QOS,
+ MqttSourceOptions.FORMAT,
+ MqttSourceOptions.FIELD_DELIMITER,
+ MqttSourceOptions.CLIENT_ID,
+ MqttSourceOptions.CLEAN_SESSION,
+ MqttSourceOptions.CONNECTION_TIMEOUT,
+ MqttSourceOptions.KEEP_ALIVE_INTERVAL,
+ MqttSourceOptions.RECONNECT_TIMEOUT,
+ MqttSourceOptions.MAX_QUEUE_SIZE)
+ .build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
MqttSource(context.getOptions());
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return MqttSource.class;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
new file mode 100644
index 0000000000..be120c9396
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MqttSourceOptions {
+
+ public static final String CONNECTOR_IDENTITY = "MQTT";
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker URL, e.g.
tcp://localhost:1883");
+
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT topic to subscribe messages from");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker authentication username");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker authentication password");
+
+ public static final Option<Integer> QOS =
+ Options.key("qos")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "MQTT subscribe QoS level. This controls
broker-client delivery only");
+
+ public static final Option<String> FORMAT =
+ Options.key("format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription("Message deserialization format: json or
text");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("Field delimiter for text format. Only
used when format=text");
+
+ public static final Option<String> CLIENT_ID =
+ Options.key("client_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "MQTT client id. Required when
clean_session=false; generated otherwise if omitted");
+
+ public static final Option<Boolean> CLEAN_SESSION =
+ Options.key("clean_session")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to use MQTT clean session");
+
+ public static final Option<Integer> CONNECTION_TIMEOUT =
+ Options.key("connection_timeout")
+ .intType()
+ .defaultValue(30)
+ .withDescription("MQTT connection timeout in seconds");
+
+ public static final Option<Integer> KEEP_ALIVE_INTERVAL =
+ Options.key("keep_alive_interval")
+ .intType()
+ .defaultValue(60)
+ .withDescription("MQTT keep alive interval in seconds");
+
+ public static final Option<Integer> RECONNECT_TIMEOUT =
+ Options.key("reconnect_timeout")
+ .intType()
+ .defaultValue(120)
+ .withDescription(
+ "Maximum seconds to wait for MQTT auto-reconnect
before failing the source");
+
+ public static final Option<Integer> MAX_QUEUE_SIZE =
+ Options.key("max_queue_size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "Maximum number of MQTT messages buffered before
deserialization");
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
new file mode 100644
index 0000000000..eee6ecb67d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+public class MqttSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
+ implements MqttCallbackExtended {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttSourceReader.class);
+ private static final long POLL_TIMEOUT_MS = 1000L;
+ private static final long QUEUE_OFFER_TIMEOUT_MS = 1000L;
+
+ private final MqttSourceConfig sourceConfig;
+ private final CatalogTable catalogTable;
+ private final BlockingQueue<byte[]> messageQueue;
+ private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final LongSupplier currentTimeMillis;
+
+ private MqttClient mqttClient;
+ private volatile Throwable receiveException;
+ private volatile long disconnectedSinceMs = -1L;
+ private volatile Throwable disconnectCause;
+
+ public MqttSourceReader(MqttSourceConfig sourceConfig, CatalogTable
catalogTable) {
+ this(sourceConfig, catalogTable, System::currentTimeMillis);
+ }
+
+ MqttSourceReader(
+ MqttSourceConfig sourceConfig,
+ CatalogTable catalogTable,
+ LongSupplier currentTimeMillis) {
+ this.sourceConfig = sourceConfig;
+ this.catalogTable = catalogTable;
+ this.currentTimeMillis = currentTimeMillis;
+ this.messageQueue = new
LinkedBlockingQueue<>(sourceConfig.getMaxQueueSize());
+ this.deserializationSchema = createDeserializationSchema(sourceConfig,
catalogTable);
+ }
+
+ @Override
+ public void open() {
+ try {
+ this.mqttClient =
+ new MqttClient(
+ sourceConfig.getUrl(),
+ sourceConfig.getClientId(),
+ new MemoryPersistence());
+ this.mqttClient.setCallback(this);
+ this.mqttClient.connect(buildConnectOptions(sourceConfig));
+ subscribeTopic();
+ LOG.info(
+ "MQTT source reader [{}] subscribed to topic [{}]",
+ sourceConfig.getClientId(),
+ sourceConfig.getTopic());
+ } catch (MqttException e) {
+ closeClientQuietly();
+ throw new MqttConnectorException(
+ MqttConnectorErrorCode.CONNECTION_FAILED,
+ "Failed to connect MQTT source client [" +
sourceConfig.getClientId() + "]",
+ e);
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ checkReceiveException();
+ checkReconnectTimeout();
+
+ byte[] payload = messageQueue.poll(POLL_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ if (payload == null) {
+ return;
+ }
+
+ SeaTunnelRow row = deserializationSchema.deserialize(payload);
+ if (row == null) {
+ return;
+ }
+
+ row.setTableId(catalogTable.getTablePath().toString());
+ synchronized (output.getCheckpointLock()) {
+ output.collect(row);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (mqttClient == null) {
+ return;
+ }
+ try {
+ if (mqttClient.isConnected()) {
+ if (sourceConfig.isCleanSession()) {
+ mqttClient.unsubscribe(sourceConfig.getTopic());
+ }
+ mqttClient.disconnect();
+ } else {
+ mqttClient.disconnectForcibly();
+ }
+ mqttClient.close();
+ LOG.info("MQTT source reader [{}] closed",
sourceConfig.getClientId());
+ } catch (MqttException e) {
+ throw new IOException("Error closing MQTT source client", e);
+ }
+ }
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ disconnectedSinceMs = currentTimeMillis.getAsLong();
+ disconnectCause = cause;
+ LOG.warn(
+ "MQTT source connection lost for client [{}], auto-reconnect
will attempt recovery",
+ sourceConfig.getClientId(),
+ cause);
+ }
+
+ @Override
+ public void connectComplete(boolean reconnect, String serverURI) {
+ if (!reconnect) {
+ return;
+ }
+ try {
+ subscribeTopic();
+ disconnectedSinceMs = -1L;
+ disconnectCause = null;
+ LOG.info(
+ "MQTT source reader [{}] resubscribed to topic [{}] after
reconnect to [{}]",
+ sourceConfig.getClientId(),
+ sourceConfig.getTopic(),
+ serverURI);
+ } catch (MqttException e) {
+ receiveException = e;
+ LOG.error(
+ "Failed to resubscribe MQTT source reader [{}] to topic
[{}] "
+ + "after reconnect to [{}]",
+ sourceConfig.getClientId(),
+ sourceConfig.getTopic(),
+ serverURI,
+ e);
+ }
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ if (message == null || message.getPayload() == null) {
+ return;
+ }
+ byte[] payload = Arrays.copyOf(message.getPayload(),
message.getPayload().length);
+ try {
+ if (!messageQueue.offer(payload, QUEUE_OFFER_TIMEOUT_MS,
TimeUnit.MILLISECONDS)) {
+ MqttConnectorException exception =
+ new MqttConnectorException(
+ MqttConnectorErrorCode.RECEIVE_FAILED,
+ "MQTT source message queue is full. Increase
max_queue_size "
+ + "or reduce MQTT message
throughput.");
+ receiveException = exception;
+ throw exception;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ receiveException = e;
+ throw new MqttConnectorException(
+ MqttConnectorErrorCode.RECEIVE_FAILED,
+ "Interrupted while buffering MQTT source message",
+ e);
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Source-only client — outbound delivery acknowledgements are not
expected.
+ }
+
+ void subscribeTopic() throws MqttException {
+ mqttClient.subscribe(sourceConfig.getTopic(), sourceConfig.getQos());
+ }
+
+ private void checkReceiveException() {
+ if (receiveException == null) {
+ return;
+ }
+ throw new MqttConnectorException(
+ MqttConnectorErrorCode.RECEIVE_FAILED,
+ "Failed to receive MQTT source messages from topic ["
+ + sourceConfig.getTopic()
+ + "] with client ["
+ + sourceConfig.getClientId()
+ + "]",
+ receiveException);
+ }
+
+ private void checkReconnectTimeout() {
+ long disconnectedAt = disconnectedSinceMs;
+ if (disconnectedAt < 0) {
+ return;
+ }
+ long elapsedMs = currentTimeMillis.getAsLong() - disconnectedAt;
+ long timeoutMs =
TimeUnit.SECONDS.toMillis(sourceConfig.getReconnectTimeout());
+ if (elapsedMs < timeoutMs) {
+ return;
+ }
+
+ MqttConnectorException exception =
+ new MqttConnectorException(
+ MqttConnectorErrorCode.RECEIVE_FAILED,
+ "MQTT source client ["
+ + sourceConfig.getClientId()
+ + "] remained disconnected from topic ["
+ + sourceConfig.getTopic()
+ + "] for more than reconnect_timeout="
+ + sourceConfig.getReconnectTimeout()
+ + " seconds",
+ disconnectCause);
+ receiveException = exception;
+ throw exception;
+ }
+
+ private static MqttConnectOptions buildConnectOptions(MqttSourceConfig
sourceConfig) {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setAutomaticReconnect(true);
+ options.setCleanSession(sourceConfig.isCleanSession());
+ options.setConnectionTimeout(sourceConfig.getConnectionTimeout());
+ options.setKeepAliveInterval(sourceConfig.getKeepAliveInterval());
+
+ String username = sourceConfig.getUsername();
+ if (username != null && !username.isEmpty()) {
+ options.setUserName(username);
+ }
+ String password = sourceConfig.getPassword();
+ if (password != null && !password.isEmpty()) {
+ options.setPassword(password.toCharArray());
+ }
+ return options;
+ }
+
+ private static DeserializationSchema<SeaTunnelRow>
createDeserializationSchema(
+ MqttSourceConfig sourceConfig, CatalogTable catalogTable) {
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ switch (sourceConfig.getFormat().toLowerCase()) {
+ case "json":
+ return new JsonDeserializationSchema(catalogTable, false,
false);
+ case "text":
+ return TextDeserializationSchema.builder()
+ .seaTunnelRowType(rowType)
+ .delimiter(sourceConfig.getFieldDelimiter())
+ .setCatalogTable(catalogTable)
+ .build();
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported MQTT source format: " +
sourceConfig.getFormat());
+ }
+ }
+
+ private void closeClientQuietly() {
+ if (mqttClient == null) {
+ return;
+ }
+ try {
+ mqttClient.close();
+ } catch (MqttException ignored) {
+ // Best-effort cleanup; the original connection exception is more
important.
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
new file mode 100644
index 0000000000..f9c4497736
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class MqttSourceConfigTest {
+
+ @Test
+ void testDefaultValues() {
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+
+ Assertions.assertEquals("tcp://localhost:1883", sourceConfig.getUrl());
+ Assertions.assertEquals("users", sourceConfig.getTopic());
+ Assertions.assertNull(sourceConfig.getUsername());
+ Assertions.assertNull(sourceConfig.getPassword());
+ Assertions.assertEquals(1, sourceConfig.getQos());
+ Assertions.assertEquals("json", sourceConfig.getFormat());
+ Assertions.assertEquals(",", sourceConfig.getFieldDelimiter());
+ Assertions.assertTrue(sourceConfig.isCleanSession());
+ Assertions.assertEquals(30, sourceConfig.getConnectionTimeout());
+ Assertions.assertEquals(60, sourceConfig.getKeepAliveInterval());
+ Assertions.assertEquals(120, sourceConfig.getReconnectTimeout());
+ Assertions.assertEquals(1000, sourceConfig.getMaxQueueSize());
+ }
+
+ @Test
+ void testExplicitValues() {
+ Map<String, Object> config = baseConfig();
+ config.put("username", "user");
+ config.put("password", "pass");
+ config.put("qos", 0);
+ config.put("format", "text");
+ config.put("field_delimiter", "|");
+ config.put("client_id", "mqtt-source-client");
+ config.put("clean_session", false);
+ config.put("connection_timeout", 10);
+ config.put("keep_alive_interval", 20);
+ config.put("reconnect_timeout", 30);
+ config.put("max_queue_size", 200);
+
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(config));
+
+ Assertions.assertEquals("user", sourceConfig.getUsername());
+ Assertions.assertEquals("pass", sourceConfig.getPassword());
+ Assertions.assertEquals(0, sourceConfig.getQos());
+ Assertions.assertEquals("text", sourceConfig.getFormat());
+ Assertions.assertEquals("|", sourceConfig.getFieldDelimiter());
+ Assertions.assertEquals("mqtt-source-client",
sourceConfig.getClientId());
+ Assertions.assertFalse(sourceConfig.isCleanSession());
+ Assertions.assertEquals(10, sourceConfig.getConnectionTimeout());
+ Assertions.assertEquals(20, sourceConfig.getKeepAliveInterval());
+ Assertions.assertEquals(30, sourceConfig.getReconnectTimeout());
+ Assertions.assertEquals(200, sourceConfig.getMaxQueueSize());
+ }
+
+ @Test
+ void testInvalidQosFails() {
+ Map<String, Object> config = baseConfig();
+ config.put("qos", 2);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testNegativeQosFails() {
+ Map<String, Object> config = baseConfig();
+ config.put("qos", -1);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testNonPositiveReconnectTimeoutFails() {
+ Map<String, Object> config = baseConfig();
+ config.put("reconnect_timeout", 0);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testNonPositiveMaxQueueSizeFails() {
+ Map<String, Object> config = baseConfig();
+ config.put("max_queue_size", 0);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testUnsupportedFormatFails() {
+ Map<String, Object> config = baseConfig();
+ config.put("format", "avro");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testPersistentSessionRequiresClientId() {
+ Map<String, Object> config = baseConfig();
+ config.put("clean_session", false);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testPersistentSessionRequiresNonBlankClientId() {
+ Map<String, Object> config = baseConfig();
+ config.put("clean_session", false);
+ config.put("client_id", " ");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+ }
+
+ @Test
+ void testDefaultClientIdIsGeneratedForCleanSession() {
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+
+
Assertions.assertTrue(sourceConfig.getClientId().startsWith("seatunnel_mqtt_source_"));
+ }
+
+ private static Map<String, Object> baseConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("url", "tcp://localhost:1883");
+ config.put("topic", "users");
+ return config;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
new file mode 100644
index 0000000000..210376bd56
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MqttSourceFactoryTest {
+
+ @Test
+ void testFactoryIdentifier() {
+ MqttSourceFactory factory = new MqttSourceFactory();
+
+ Assertions.assertEquals(MqttSourceOptions.CONNECTOR_IDENTITY,
factory.factoryIdentifier());
+ Assertions.assertEquals(MqttSource.class, factory.getSourceClass());
+ }
+
+ @Test
+ void testOptionRule() {
+ MqttSourceFactory factory = new MqttSourceFactory();
+ OptionRule rule = factory.optionRule();
+
+ List<Option<?>> requiredOptions =
+ rule.getRequiredOptions().stream()
+ .flatMap(requiredOption ->
requiredOption.getOptions().stream())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(requiredOptions.contains(MqttSourceOptions.URL));
+
Assertions.assertTrue(requiredOptions.contains(MqttSourceOptions.TOPIC));
+
Assertions.assertTrue(requiredOptions.contains(ConnectorCommonOptions.SCHEMA));
+
+ List<Option<?>> optionalOptions = rule.getOptionalOptions();
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.USERNAME));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.PASSWORD));
+ Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.QOS));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.FORMAT));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.FIELD_DELIMITER));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CLIENT_ID));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CLEAN_SESSION));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CONNECTION_TIMEOUT));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.KEEP_ALIVE_INTERVAL));
+
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.MAX_QUEUE_SIZE));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
new file mode 100644
index 0000000000..2836813753
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+class MqttSourceTest {
+
+ @Test
+ void testSourceMetadataAndBoundedness() {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+ Assertions.assertEquals(MqttSourceOptions.CONNECTOR_IDENTITY,
source.getPluginName());
+ Assertions.assertEquals(Boundedness.UNBOUNDED,
source.getBoundedness());
+ }
+
+ @Test
+ void testStreamingJobModeIsAllowed() {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+ source.setJobContext(new JobContext().setJobMode(JobMode.STREAMING));
+
+ Assertions.assertEquals(Boundedness.UNBOUNDED,
source.getBoundedness());
+ }
+
+ @Test
+ void testBatchJobModeFails() {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+ source.setJobContext(new JobContext().setJobMode(JobMode.BATCH));
+
+ Assertions.assertThrows(MqttConnectorException.class,
source::getBoundedness);
+ }
+
+ @Test
+ void testProducedCatalogTables() {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+ List<CatalogTable> catalogTables = source.getProducedCatalogTables();
+
+ Assertions.assertEquals(1, catalogTables.size());
+ Assertions.assertArrayEquals(
+ new String[] {"id"},
catalogTables.get(0).getTableSchema().getFieldNames());
+ }
+
+ @Test
+ void testCreateReader() throws Exception {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+ SingleSplitReaderContext readerContext =
Mockito.mock(SingleSplitReaderContext.class);
+ AbstractSingleSplitReader<?> reader =
source.createReader(readerContext);
+
+ Assertions.assertInstanceOf(MqttSourceReader.class, reader);
+ }
+
+ @Test
+ void testReaderCollectsArrivedJsonMessage() throws Exception {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+ SingleSplitReaderContext readerContext =
Mockito.mock(SingleSplitReaderContext.class);
+ MqttSourceReader reader = (MqttSourceReader)
source.createReader(readerContext);
+ RecordingCollector collector = new RecordingCollector();
+
+ reader.messageArrived("users", mqttMessage("{\"id\":1}"));
+ reader.pollNext(collector);
+
+ Assertions.assertNotNull(collector.record);
+ Assertions.assertEquals(1, collector.record.getField(0));
+ Assertions.assertEquals(
+
source.getProducedCatalogTables().get(0).getTablePath().toString(),
+ collector.record.getTableId());
+ }
+
+ @Test
+ void testReaderFailsWhenReconnectTimeoutExceeded() throws Exception {
+ Map<String, Object> config = baseConfig();
+ config.put("reconnect_timeout", 1);
+ MqttSource source = new MqttSource(ReadonlyConfig.fromMap(config));
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(config));
+ AtomicLong currentTimeMillis = new AtomicLong(0L);
+ MqttSourceReader reader =
+ new MqttSourceReader(
+ sourceConfig,
+ source.getProducedCatalogTables().get(0),
+ currentTimeMillis::get);
+
+ RuntimeException cause = new RuntimeException("broker unavailable");
+ reader.connectionLost(cause);
+ currentTimeMillis.set(1001L);
+
+ Assertions.assertThrows(
+ MqttConnectorException.class, () -> reader.pollNext(new
RecordingCollector()));
+ }
+
+ @Test
+ void testReaderFailsWhenResubscribeAfterReconnectFails() throws Exception {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+ MqttSourceReader reader =
+ new MqttSourceReader(
+ sourceConfig,
+ source.getProducedCatalogTables().get(0),
+ System::currentTimeMillis);
+
+ try (org.mockito.MockedConstruction<MqttClient> mocked =
+ Mockito.mockConstruction(MqttClient.class)) {
+ reader.open();
+ MqttClient mockClient = mocked.constructed().get(0);
+ Mockito.doThrow(new
MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
+ .when(mockClient)
+ .subscribe("users", 1);
+
+ reader.connectionLost(new RuntimeException("connection lost"));
+
+ Assertions.assertDoesNotThrow(
+ () -> reader.connectComplete(true,
"tcp://localhost:1883"));
+ Assertions.assertThrows(
+ MqttConnectorException.class, () -> reader.pollNext(new
RecordingCollector()));
+ }
+ }
+
+ @Test
+ void testReaderCloseForciblyDisconnectsWhenClientIsNotConnected() throws
Exception {
+ MqttSource source = new
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+ MqttSourceConfig sourceConfig = new
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+ MqttSourceReader reader =
+ new MqttSourceReader(
+ sourceConfig,
+ source.getProducedCatalogTables().get(0),
+ System::currentTimeMillis);
+
+ try (org.mockito.MockedConstruction<MqttClient> mocked =
+ Mockito.mockConstruction(MqttClient.class)) {
+ reader.open();
+ MqttClient mockClient = mocked.constructed().get(0);
+ Mockito.when(mockClient.isConnected()).thenReturn(false);
+
+ reader.close();
+
+ Mockito.verify(mockClient).disconnectForcibly();
+ Mockito.verify(mockClient).close();
+ Mockito.verify(mockClient, Mockito.never()).disconnect();
+ }
+ }
+
+ @Test
+ void testReaderFailsWhenMessageQueueIsFull() throws Exception {
+ Map<String, Object> config = baseConfig();
+ config.put("max_queue_size", 1);
+ MqttSource source = new MqttSource(ReadonlyConfig.fromMap(config));
+ SingleSplitReaderContext readerContext =
Mockito.mock(SingleSplitReaderContext.class);
+ MqttSourceReader reader = (MqttSourceReader)
source.createReader(readerContext);
+
+ reader.messageArrived("users", mqttMessage("{\"id\":1}"));
+
+ Assertions.assertThrows(
+ MqttConnectorException.class,
+ () -> reader.messageArrived("users",
mqttMessage("{\"id\":2}")));
+ Assertions.assertThrows(
+ MqttConnectorException.class, () -> reader.pollNext(new
RecordingCollector()));
+ }
+
+ private static Map<String, Object> baseConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("url", "tcp://localhost:1883");
+ config.put("topic", "users");
+ config.put("schema", schemaConfig());
+ return config;
+ }
+
+ private static Map<String, Object> schemaConfig() {
+ Map<String, Object> schema = new HashMap<>();
+ schema.put("fields", Collections.singletonMap("id", "int"));
+ return schema;
+ }
+
+ private static MqttMessage mqttMessage(String payload) {
+ return new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private static class RecordingCollector implements Collector<SeaTunnelRow>
{
+ private final Object checkpointLock = new Object();
+ private SeaTunnelRow record;
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ this.record = record;
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+ }
+}