This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0d551e346d [Feature][Connector-V2] Add MQTT source connector (#10935)
0d551e346d is described below

commit 0d551e346d41e3e3e4e371d951ec857ebea9b959
Author: marmong <[email protected]>
AuthorDate: Mon Jun 8 23:33:23 2026 +0900

    [Feature][Connector-V2] Add MQTT source connector (#10935)
---
 docs/en/connectors/source/Mqtt.md                  | 190 +++++++++++++
 docs/zh/connectors/source/Mqtt.md                  | 190 +++++++++++++
 plugin-mapping.properties                          |   1 +
 .../mqtt/exception/MqttConnectorErrorCode.java     |   3 +-
 .../seatunnel/mqtt/source/MqttSource.java          |  79 ++++++
 .../seatunnel/mqtt/source/MqttSourceConfig.java    | 141 ++++++++++
 .../seatunnel/mqtt/source/MqttSourceFactory.java   |  73 +++++
 .../seatunnel/mqtt/source/MqttSourceOptions.java   | 108 ++++++++
 .../seatunnel/mqtt/source/MqttSourceReader.java    | 304 +++++++++++++++++++++
 .../mqtt/source/MqttSourceConfigTest.java          | 162 +++++++++++
 .../mqtt/source/MqttSourceFactoryTest.java         |  65 +++++
 .../seatunnel/mqtt/source/MqttSourceTest.java      | 232 ++++++++++++++++
 12 files changed, 1547 insertions(+), 1 deletion(-)

diff --git a/docs/en/connectors/source/Mqtt.md 
b/docs/en/connectors/source/Mqtt.md
new file mode 100644
index 0000000000..d810978b74
--- /dev/null
+++ b/docs/en/connectors/source/Mqtt.md
@@ -0,0 +1,190 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT source connector
+
+## Description
+
+Used to read messages from an MQTT broker. Supports MQTT 3.1.1 protocol via 
the Eclipse Paho client library.
+
+This connector subscribes to a configured MQTT topic, deserializes message 
payloads as JSON or text, and converts them into SeaTunnel rows.
+
+## Key features
+
+- [ ] [batch](../../introduction/concepts/connector-v2-features.md)
+- [x] [stream](../../introduction/concepts/connector-v2-features.md)
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [ ] [column projection](../../introduction/concepts/connector-v2-features.md)
+- [ ] [parallelism](../../introduction/concepts/connector-v2-features.md)
+- [ ] [support user-defined 
split](../../introduction/concepts/connector-v2-features.md)
+
+:::caution Delivery semantics
+
+The `qos` option controls MQTT broker-client delivery only. It is not 
integrated with SeaTunnel checkpointing, so this source does not provide 
end-to-end exactly-once or at-least-once guarantees.
+
+For persistent MQTT sessions, set `clean_session=false` and configure a stable 
`client_id`. When `clean_session=false`, the source disconnects without 
unsubscribing during close, so the broker can retain the subscription according 
to MQTT session semantics.
+
+The source uses MQTT auto-reconnect. If the client remains disconnected longer 
than `reconnect_timeout`, the source task fails to avoid a silent ingestion 
stall.
+
+:::
+
+## Options
+
+| name                | type    | required | default value |
+|---------------------|---------|----------|---------------|
+| url                 | string  | yes      | -             |
+| topic               | string  | yes      | -             |
+| schema              | config  | yes      | -             |
+| username            | string  | no       | -             |
+| password            | string  | no       | -             |
+| qos                 | int     | no       | 1             |
+| format              | string  | no       | json          |
+| field_delimiter     | string  | no       | ,             |
+| client_id           | string  | no       | -             |
+| clean_session       | boolean | no       | true          |
+| connection_timeout  | int     | no       | 30            |
+| keep_alive_interval | int     | no       | 60            |
+| reconnect_timeout   | int     | no       | 120           |
+| max_queue_size      | int     | no       | 1000          |
+| common-options      |         | no       | -             |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to subscribe messages from.
+
+Example: `iot/sensors/temperature`
+
+### schema [config]
+
+The schema fields of upstream data. For more details, please refer to [Schema 
Feature](../../introduction/concepts/schema-feature.md).
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level used when subscribing to the topic.
+
+This setting only controls delivery between the MQTT broker and the MQTT 
client. It does not provide end-to-end delivery guarantees in SeaTunnel.
+
+Supported values:
+
+- `0` — MQTT QoS 0
+- `1` — MQTT QoS 1
+
+### format [string]
+
+The deserialization format for incoming messages. Supported values:
+
+- `json` — Deserialize each message as a JSON object (default)
+- `text` — Deserialize each message as delimited plain text (delimiter 
controlled by `field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### client_id [string]
+
+The MQTT client id. If omitted while `clean_session=true`, the connector 
generates a random client id.
+
+This option is required when `clean_session=false`, because persistent MQTT 
sessions require a stable client id.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards previous session state. Suitable for stateless 
operation.
+- `false` — Broker can retain session state, including subscriptions. Requires 
a stable `client_id`.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### keep_alive_interval [int]
+
+The MQTT keep alive interval in seconds.
+
+### reconnect_timeout [int]
+
+Maximum seconds to wait for MQTT auto-reconnect before failing the source. If 
the MQTT client remains disconnected longer than this timeout, `pollNext()` 
fails the source task instead of silently waiting forever.
+
+### max_queue_size [int]
+
+Maximum number of MQTT messages buffered in memory before deserialization.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.
+
+## Example
+
+### JSON source
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MQTT {
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    qos = 1
+    format = "json"
+    schema = {
+      fields {
+        id = bigint
+        name = string
+        temperature = double
+      }
+    }
+    plugin_output = "sensor_data"
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "sensor_data"
+  }
+}
+```
+
+### Persistent session source
+
+```hocon
+source {
+  MQTT {
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    client_id = "seatunnel-mqtt-source"
+    clean_session = false
+    qos = 1
+    format = "json"
+    schema = {
+      fields {
+        id = bigint
+        temperature = double
+      }
+    }
+  }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/Mqtt.md 
b/docs/zh/connectors/source/Mqtt.md
new file mode 100644
index 0000000000..b057c8f3b4
--- /dev/null
+++ b/docs/zh/connectors/source/Mqtt.md
@@ -0,0 +1,190 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT 源连接器
+
+## 描述
+
+用于从 MQTT broker 读取消息。该连接器通过 Eclipse Paho 客户端库支持 MQTT 3.1.1 协议。
+
+该连接器会订阅配置的 MQTT topic,将消息 payload 按 JSON 或 text 格式反序列化,并转换为 SeaTunnel Row。
+
+## 关键特性
+
+- [ ] [批](../../introduction/concepts/connector-v2-features.md)
+- [x] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [ ] [列投影](../../introduction/concepts/connector-v2-features.md)
+- [ ] [并行性](../../introduction/concepts/connector-v2-features.md)
+- [ ] [支持用户自定义 split](../../introduction/concepts/connector-v2-features.md)
+
+:::caution 交付语义
+
+`qos` 选项只控制 MQTT broker 和 MQTT client 之间的交付语义。它没有与 SeaTunnel checkpoint 集成,因此该 
Source 不提供端到端的精确一次或至少一次保证。
+
+如需使用 MQTT 持久会话,请设置 `clean_session=false` 并配置稳定的 `client_id`。当 
`clean_session=false` 时,Source 在关闭时只断开连接,不会取消订阅,因此 broker 可以根据 MQTT 会话语义保留订阅。
+
+Source 使用 MQTT 自动重连。如果 client 断开连接的时间超过 `reconnect_timeout`,Source task 
会失败,以避免静默停止摄取。
+
+:::
+
+## 选项
+
+| 参数名               | 类型      | 必须 | 默认值  |
+|-------------------|---------|----|------|
+| url               | string  | 是  | -    |
+| topic             | string  | 是  | -    |
+| schema            | config  | 是  | -    |
+| username          | string  | 否  | -    |
+| password          | string  | 否  | -    |
+| qos               | int     | 否  | 1    |
+| format            | string  | 否  | json |
+| field_delimiter   | string  | 否  | ,    |
+| client_id         | string  | 否  | -    |
+| clean_session     | boolean | 否  | true |
+| connection_timeout | int    | 否  | 30   |
+| keep_alive_interval | int   | 否  | 60   |
+| reconnect_timeout | int     | 否  | 120  |
+| max_queue_size    | int     | 否  | 1000 |
+| common-options    |         | 否  | -    |
+
+### url [string]
+
+MQTT broker 连接 URL。必须包含协议、主机和端口。
+
+示例:`tcp://broker.example.com:1883`
+
+### topic [string]
+
+要订阅消息的 MQTT topic。
+
+示例:`iot/sensors/temperature`
+
+### schema [config]
+
+上游数据的 schema 字段。更多详情请参考 [Schema 
特性](../../introduction/concepts/schema-feature.md)。
+
+### username [string]
+
+MQTT broker 认证用户名。匿名访问时可以不配置。
+
+### password [string]
+
+MQTT broker 认证密码。匿名访问时可以不配置。
+
+### qos [int]
+
+订阅 topic 时使用的 MQTT Quality of Service 等级。
+
+该设置只控制 MQTT broker 和 MQTT client 之间的交付,不提供 SeaTunnel 端到端交付保证。
+
+支持的值:
+
+- `0` — MQTT QoS 0
+- `1` — MQTT QoS 1
+
+### format [string]
+
+输入消息的反序列化格式。支持的值:
+
+- `json` — 将每条消息反序列化为 JSON 对象(默认)
+- `text` — 将每条消息按分隔符反序列化为纯文本(分隔符由 `field_delimiter` 控制)
+
+### field_delimiter [string]
+
+当 `format` 设置为 `text` 时使用的字段分隔符。默认值为 `,`。
+
+示例:`,`, `|`, `\t`
+
+### client_id [string]
+
+MQTT client id。当 `clean_session=true` 且未配置该选项时,连接器会生成随机 client id。
+
+当 `clean_session=false` 时必须配置该选项,因为 MQTT 持久会话需要稳定的 client id。
+
+### clean_session [boolean]
+
+是否使用 clean MQTT session。默认值为 `true`。
+
+- `true` — broker 丢弃之前的会话状态,适合无状态运行。
+- `false` — broker 可以保留会话状态,包括订阅信息。需要稳定的 `client_id`。
+
+### connection_timeout [int]
+
+MQTT 连接建立超时时间,单位为秒。
+
+### keep_alive_interval [int]
+
+MQTT keep alive 间隔,单位为秒。
+
+### reconnect_timeout [int]
+
+等待 MQTT 自动重连的最长时间,单位为秒。如果 MQTT client 断开连接的时间超过该超时时间,`pollNext()` 会使 Source 
task 失败,避免无限期静默等待。
+
+### max_queue_size [int]
+
+反序列化之前在内存中缓存的 MQTT 消息最大数量。
+
+### common options
+
+源插件通用参数,请参考 [源通用选项](../common-options/source-common-options.md) 详见。
+
+## 示例
+
+### JSON Source
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  MQTT {
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    qos = 1
+    format = "json"
+    schema = {
+      fields {
+        id = bigint
+        name = string
+        temperature = double
+      }
+    }
+    plugin_output = "sensor_data"
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "sensor_data"
+  }
+}
+```
+
+### 持久会话 Source
+
+```hocon
+source {
+  MQTT {
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    client_id = "seatunnel-mqtt-source"
+    clean_session = false
+    qos = 1
+    format = "json"
+    schema = {
+      fields {
+        id = bigint
+        temperature = double
+      }
+    }
+  }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index f64fe5ddd1..fe181d223c 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -139,6 +139,7 @@ seatunnel.sink.ObsFile = connector-file-obs
 seatunnel.source.Milvus = connector-milvus
 seatunnel.sink.Milvus = connector-milvus
 seatunnel.sink.ActiveMQ = connector-activemq
+seatunnel.source.MQTT = connector-mqtt
 seatunnel.sink.MQTT = connector-mqtt
 seatunnel.source.Prometheus = connector-prometheus
 seatunnel.sink.Prometheus = connector-prometheus
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
index f55089f7a7..dc8d76fd85 100644
--- 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
@@ -22,7 +22,8 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
     CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
     PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
-    INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
+    INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration"),
+    RECEIVE_FAILED("MQTT-04", "MQTT message receive failed");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
new file mode 100644
index 0000000000..580b3119bc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import java.util.Collections;
+import java.util.List;
+
+public class MqttSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+    private final MqttSourceConfig sourceConfig;
+    private final CatalogTable catalogTable;
+    private JobContext jobContext;
+
+    public MqttSource(ReadonlyConfig pluginConfig) {
+        this.sourceConfig = new MqttSourceConfig(pluginConfig);
+        this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        if (jobContext != null && 
!JobMode.STREAMING.equals(jobContext.getJobMode())) {
+            throw new MqttConnectorException(
+                    MqttConnectorErrorCode.INVALID_CONFIG,
+                    String.format(
+                            "PluginName: %s, Message: MQTT source only 
supports streaming job mode",
+                            getPluginName()));
+        }
+        return Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public String getPluginName() {
+        return MqttSourceOptions.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> createReader(
+            SingleSplitReaderContext readerContext) throws Exception {
+        return new MqttSourceReader(sourceConfig, catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
new file mode 100644
index 0000000000..e100e9bf3a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfig.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import java.util.UUID;
+
+public class MqttSourceConfig {
+
+    private static final String CLIENT_ID_PREFIX = "seatunnel_mqtt_source_";
+
+    private final String url;
+    private final String topic;
+    private final String username;
+    private final String password;
+    private final int qos;
+    private final String format;
+    private final String fieldDelimiter;
+    private final String clientId;
+    private final boolean cleanSession;
+    private final int connectionTimeout;
+    private final int keepAliveInterval;
+    private final int reconnectTimeout;
+    private final int maxQueueSize;
+
+    public MqttSourceConfig(ReadonlyConfig config) {
+        this.url = config.get(MqttSourceOptions.URL);
+        this.topic = config.get(MqttSourceOptions.TOPIC);
+        this.username = config.get(MqttSourceOptions.USERNAME);
+        this.password = config.get(MqttSourceOptions.PASSWORD);
+        this.qos = config.get(MqttSourceOptions.QOS);
+        this.format = config.get(MqttSourceOptions.FORMAT);
+        this.fieldDelimiter = config.get(MqttSourceOptions.FIELD_DELIMITER);
+        this.cleanSession = config.get(MqttSourceOptions.CLEAN_SESSION);
+        this.connectionTimeout = 
config.get(MqttSourceOptions.CONNECTION_TIMEOUT);
+        this.keepAliveInterval = 
config.get(MqttSourceOptions.KEEP_ALIVE_INTERVAL);
+        this.reconnectTimeout = 
config.get(MqttSourceOptions.RECONNECT_TIMEOUT);
+        this.maxQueueSize = config.get(MqttSourceOptions.MAX_QUEUE_SIZE);
+
+        String configuredClientId = config.get(MqttSourceOptions.CLIENT_ID);
+        if (!cleanSession && isBlank(configuredClientId)) {
+            throw new IllegalArgumentException(
+                    "client_id is required when clean_session=false for MQTT 
source");
+        }
+        this.clientId =
+                isBlank(configuredClientId)
+                        ? CLIENT_ID_PREFIX + UUID.randomUUID().toString()
+                        : configuredClientId;
+
+        validate();
+    }
+
+    private void validate() {
+        if (qos < 0 || qos > 1) {
+            throw new IllegalArgumentException("MQTT source qos must be 0 or 
1, got: " + qos);
+        }
+        if (!"json".equalsIgnoreCase(format) && 
!"text".equalsIgnoreCase(format)) {
+            throw new IllegalArgumentException("Unsupported MQTT source 
format: " + format);
+        }
+        if (reconnectTimeout <= 0) {
+            throw new IllegalArgumentException(
+                    "reconnect_timeout must be greater than 0, got: " + 
reconnectTimeout);
+        }
+        if (maxQueueSize <= 0) {
+            throw new IllegalArgumentException(
+                    "max_queue_size must be greater than 0, got: " + 
maxQueueSize);
+        }
+    }
+
+    private static boolean isBlank(String value) {
+        return value == null || value.trim().isEmpty();
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    public String getFormat() {
+        return format;
+    }
+
+    public String getFieldDelimiter() {
+        return fieldDelimiter;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public boolean isCleanSession() {
+        return cleanSession;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public int getKeepAliveInterval() {
+        return keepAliveInterval;
+    }
+
+    public int getReconnectTimeout() {
+        return reconnectTimeout;
+    }
+
+    public int getMaxQueueSize() {
+        return maxQueueSize;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
new file mode 100644
index 0000000000..8aca1f2bcc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class MqttSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return MqttSourceOptions.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        MqttSourceOptions.URL,
+                        MqttSourceOptions.TOPIC,
+                        ConnectorCommonOptions.SCHEMA)
+                .optional(
+                        MqttSourceOptions.USERNAME,
+                        MqttSourceOptions.PASSWORD,
+                        MqttSourceOptions.QOS,
+                        MqttSourceOptions.FORMAT,
+                        MqttSourceOptions.FIELD_DELIMITER,
+                        MqttSourceOptions.CLIENT_ID,
+                        MqttSourceOptions.CLEAN_SESSION,
+                        MqttSourceOptions.CONNECTION_TIMEOUT,
+                        MqttSourceOptions.KEEP_ALIVE_INTERVAL,
+                        MqttSourceOptions.RECONNECT_TIMEOUT,
+                        MqttSourceOptions.MAX_QUEUE_SIZE)
+                .build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
MqttSource(context.getOptions());
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return MqttSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
new file mode 100644
index 0000000000..be120c9396
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MqttSourceOptions {
+
+    public static final String CONNECTOR_IDENTITY = "MQTT";
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker URL, e.g. 
tcp://localhost:1883");
+
+    public static final Option<String> TOPIC =
+            Options.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT topic to subscribe messages from");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker authentication username");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker authentication password");
+
+    public static final Option<Integer> QOS =
+            Options.key("qos")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "MQTT subscribe QoS level. This controls 
broker-client delivery only");
+
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription("Message deserialization format: json or 
text");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription("Field delimiter for text format. Only 
used when format=text");
+
+    public static final Option<String> CLIENT_ID =
+            Options.key("client_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "MQTT client id. Required when 
clean_session=false; generated otherwise if omitted");
+
+    public static final Option<Boolean> CLEAN_SESSION =
+            Options.key("clean_session")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to use MQTT clean session");
+
+    public static final Option<Integer> CONNECTION_TIMEOUT =
+            Options.key("connection_timeout")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription("MQTT connection timeout in seconds");
+
+    public static final Option<Integer> KEEP_ALIVE_INTERVAL =
+            Options.key("keep_alive_interval")
+                    .intType()
+                    .defaultValue(60)
+                    .withDescription("MQTT keep alive interval in seconds");
+
+    public static final Option<Integer> RECONNECT_TIMEOUT =
+            Options.key("reconnect_timeout")
+                    .intType()
+                    .defaultValue(120)
+                    .withDescription(
+                            "Maximum seconds to wait for MQTT auto-reconnect 
before failing the source");
+
+    public static final Option<Integer> MAX_QUEUE_SIZE =
+            Options.key("max_queue_size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "Maximum number of MQTT messages buffered before 
deserialization");
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
new file mode 100644
index 0000000000..eee6ecb67d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceReader.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+public class MqttSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
+        implements MqttCallbackExtended {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MqttSourceReader.class);
+    private static final long POLL_TIMEOUT_MS = 1000L;
+    private static final long QUEUE_OFFER_TIMEOUT_MS = 1000L;
+
+    private final MqttSourceConfig sourceConfig;
+    private final CatalogTable catalogTable;
+    private final BlockingQueue<byte[]> messageQueue;
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final LongSupplier currentTimeMillis;
+
+    private MqttClient mqttClient;
+    private volatile Throwable receiveException;
+    private volatile long disconnectedSinceMs = -1L;
+    private volatile Throwable disconnectCause;
+
+    public MqttSourceReader(MqttSourceConfig sourceConfig, CatalogTable 
catalogTable) {
+        this(sourceConfig, catalogTable, System::currentTimeMillis);
+    }
+
+    MqttSourceReader(
+            MqttSourceConfig sourceConfig,
+            CatalogTable catalogTable,
+            LongSupplier currentTimeMillis) {
+        this.sourceConfig = sourceConfig;
+        this.catalogTable = catalogTable;
+        this.currentTimeMillis = currentTimeMillis;
+        this.messageQueue = new 
LinkedBlockingQueue<>(sourceConfig.getMaxQueueSize());
+        this.deserializationSchema = createDeserializationSchema(sourceConfig, 
catalogTable);
+    }
+
+    @Override
+    public void open() {
+        try {
+            this.mqttClient =
+                    new MqttClient(
+                            sourceConfig.getUrl(),
+                            sourceConfig.getClientId(),
+                            new MemoryPersistence());
+            this.mqttClient.setCallback(this);
+            this.mqttClient.connect(buildConnectOptions(sourceConfig));
+            subscribeTopic();
+            LOG.info(
+                    "MQTT source reader [{}] subscribed to topic [{}]",
+                    sourceConfig.getClientId(),
+                    sourceConfig.getTopic());
+        } catch (MqttException e) {
+            closeClientQuietly();
+            throw new MqttConnectorException(
+                    MqttConnectorErrorCode.CONNECTION_FAILED,
+                    "Failed to connect MQTT source client [" + 
sourceConfig.getClientId() + "]",
+                    e);
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        checkReceiveException();
+        checkReconnectTimeout();
+
+        byte[] payload = messageQueue.poll(POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        if (payload == null) {
+            return;
+        }
+
+        SeaTunnelRow row = deserializationSchema.deserialize(payload);
+        if (row == null) {
+            return;
+        }
+
+        row.setTableId(catalogTable.getTablePath().toString());
+        synchronized (output.getCheckpointLock()) {
+            output.collect(row);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (mqttClient == null) {
+            return;
+        }
+        try {
+            if (mqttClient.isConnected()) {
+                if (sourceConfig.isCleanSession()) {
+                    mqttClient.unsubscribe(sourceConfig.getTopic());
+                }
+                mqttClient.disconnect();
+            } else {
+                mqttClient.disconnectForcibly();
+            }
+            mqttClient.close();
+            LOG.info("MQTT source reader [{}] closed", 
sourceConfig.getClientId());
+        } catch (MqttException e) {
+            throw new IOException("Error closing MQTT source client", e);
+        }
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        disconnectedSinceMs = currentTimeMillis.getAsLong();
+        disconnectCause = cause;
+        LOG.warn(
+                "MQTT source connection lost for client [{}], auto-reconnect 
will attempt recovery",
+                sourceConfig.getClientId(),
+                cause);
+    }
+
+    @Override
+    public void connectComplete(boolean reconnect, String serverURI) {
+        if (!reconnect) {
+            return;
+        }
+        try {
+            subscribeTopic();
+            disconnectedSinceMs = -1L;
+            disconnectCause = null;
+            LOG.info(
+                    "MQTT source reader [{}] resubscribed to topic [{}] after 
reconnect to [{}]",
+                    sourceConfig.getClientId(),
+                    sourceConfig.getTopic(),
+                    serverURI);
+        } catch (MqttException e) {
+            receiveException = e;
+            LOG.error(
+                    "Failed to resubscribe MQTT source reader [{}] to topic 
[{}] "
+                            + "after reconnect to [{}]",
+                    sourceConfig.getClientId(),
+                    sourceConfig.getTopic(),
+                    serverURI,
+                    e);
+        }
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+        if (message == null || message.getPayload() == null) {
+            return;
+        }
+        byte[] payload = Arrays.copyOf(message.getPayload(), 
message.getPayload().length);
+        try {
+            if (!messageQueue.offer(payload, QUEUE_OFFER_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+                MqttConnectorException exception =
+                        new MqttConnectorException(
+                                MqttConnectorErrorCode.RECEIVE_FAILED,
+                                "MQTT source message queue is full. Increase 
max_queue_size "
+                                        + "or reduce MQTT message 
throughput.");
+                receiveException = exception;
+                throw exception;
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            receiveException = e;
+            throw new MqttConnectorException(
+                    MqttConnectorErrorCode.RECEIVE_FAILED,
+                    "Interrupted while buffering MQTT source message",
+                    e);
+        }
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        // Source-only client — outbound delivery acknowledgements are not 
expected.
+    }
+
+    void subscribeTopic() throws MqttException {
+        mqttClient.subscribe(sourceConfig.getTopic(), sourceConfig.getQos());
+    }
+
+    private void checkReceiveException() {
+        if (receiveException == null) {
+            return;
+        }
+        throw new MqttConnectorException(
+                MqttConnectorErrorCode.RECEIVE_FAILED,
+                "Failed to receive MQTT source messages from topic ["
+                        + sourceConfig.getTopic()
+                        + "] with client ["
+                        + sourceConfig.getClientId()
+                        + "]",
+                receiveException);
+    }
+
+    private void checkReconnectTimeout() {
+        long disconnectedAt = disconnectedSinceMs;
+        if (disconnectedAt < 0) {
+            return;
+        }
+        long elapsedMs = currentTimeMillis.getAsLong() - disconnectedAt;
+        long timeoutMs = 
TimeUnit.SECONDS.toMillis(sourceConfig.getReconnectTimeout());
+        if (elapsedMs < timeoutMs) {
+            return;
+        }
+
+        MqttConnectorException exception =
+                new MqttConnectorException(
+                        MqttConnectorErrorCode.RECEIVE_FAILED,
+                        "MQTT source client ["
+                                + sourceConfig.getClientId()
+                                + "] remained disconnected from topic ["
+                                + sourceConfig.getTopic()
+                                + "] for more than reconnect_timeout="
+                                + sourceConfig.getReconnectTimeout()
+                                + " seconds",
+                        disconnectCause);
+        receiveException = exception;
+        throw exception;
+    }
+
+    private static MqttConnectOptions buildConnectOptions(MqttSourceConfig 
sourceConfig) {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setAutomaticReconnect(true);
+        options.setCleanSession(sourceConfig.isCleanSession());
+        options.setConnectionTimeout(sourceConfig.getConnectionTimeout());
+        options.setKeepAliveInterval(sourceConfig.getKeepAliveInterval());
+
+        String username = sourceConfig.getUsername();
+        if (username != null && !username.isEmpty()) {
+            options.setUserName(username);
+        }
+        String password = sourceConfig.getPassword();
+        if (password != null && !password.isEmpty()) {
+            options.setPassword(password.toCharArray());
+        }
+        return options;
+    }
+
+    private static DeserializationSchema<SeaTunnelRow> 
createDeserializationSchema(
+            MqttSourceConfig sourceConfig, CatalogTable catalogTable) {
+        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        switch (sourceConfig.getFormat().toLowerCase()) {
+            case "json":
+                return new JsonDeserializationSchema(catalogTable, false, 
false);
+            case "text":
+                return TextDeserializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(sourceConfig.getFieldDelimiter())
+                        .setCatalogTable(catalogTable)
+                        .build();
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported MQTT source format: " + 
sourceConfig.getFormat());
+        }
+    }
+
+    private void closeClientQuietly() {
+        if (mqttClient == null) {
+            return;
+        }
+        try {
+            mqttClient.close();
+        } catch (MqttException ignored) {
+            // Best-effort cleanup; the original connection exception is more 
important.
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
new file mode 100644
index 0000000000..f9c4497736
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceConfigTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class MqttSourceConfigTest {
+
+    @Test
+    void testDefaultValues() {
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+
+        Assertions.assertEquals("tcp://localhost:1883", sourceConfig.getUrl());
+        Assertions.assertEquals("users", sourceConfig.getTopic());
+        Assertions.assertNull(sourceConfig.getUsername());
+        Assertions.assertNull(sourceConfig.getPassword());
+        Assertions.assertEquals(1, sourceConfig.getQos());
+        Assertions.assertEquals("json", sourceConfig.getFormat());
+        Assertions.assertEquals(",", sourceConfig.getFieldDelimiter());
+        Assertions.assertTrue(sourceConfig.isCleanSession());
+        Assertions.assertEquals(30, sourceConfig.getConnectionTimeout());
+        Assertions.assertEquals(60, sourceConfig.getKeepAliveInterval());
+        Assertions.assertEquals(120, sourceConfig.getReconnectTimeout());
+        Assertions.assertEquals(1000, sourceConfig.getMaxQueueSize());
+    }
+
+    @Test
+    void testExplicitValues() {
+        Map<String, Object> config = baseConfig();
+        config.put("username", "user");
+        config.put("password", "pass");
+        config.put("qos", 0);
+        config.put("format", "text");
+        config.put("field_delimiter", "|");
+        config.put("client_id", "mqtt-source-client");
+        config.put("clean_session", false);
+        config.put("connection_timeout", 10);
+        config.put("keep_alive_interval", 20);
+        config.put("reconnect_timeout", 30);
+        config.put("max_queue_size", 200);
+
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(config));
+
+        Assertions.assertEquals("user", sourceConfig.getUsername());
+        Assertions.assertEquals("pass", sourceConfig.getPassword());
+        Assertions.assertEquals(0, sourceConfig.getQos());
+        Assertions.assertEquals("text", sourceConfig.getFormat());
+        Assertions.assertEquals("|", sourceConfig.getFieldDelimiter());
+        Assertions.assertEquals("mqtt-source-client", 
sourceConfig.getClientId());
+        Assertions.assertFalse(sourceConfig.isCleanSession());
+        Assertions.assertEquals(10, sourceConfig.getConnectionTimeout());
+        Assertions.assertEquals(20, sourceConfig.getKeepAliveInterval());
+        Assertions.assertEquals(30, sourceConfig.getReconnectTimeout());
+        Assertions.assertEquals(200, sourceConfig.getMaxQueueSize());
+    }
+
+    @Test
+    void testInvalidQosFails() {
+        Map<String, Object> config = baseConfig();
+        config.put("qos", 2);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testNegativeQosFails() {
+        Map<String, Object> config = baseConfig();
+        config.put("qos", -1);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testNonPositiveReconnectTimeoutFails() {
+        Map<String, Object> config = baseConfig();
+        config.put("reconnect_timeout", 0);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testNonPositiveMaxQueueSizeFails() {
+        Map<String, Object> config = baseConfig();
+        config.put("max_queue_size", 0);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testUnsupportedFormatFails() {
+        Map<String, Object> config = baseConfig();
+        config.put("format", "avro");
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testPersistentSessionRequiresClientId() {
+        Map<String, Object> config = baseConfig();
+        config.put("clean_session", false);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testPersistentSessionRequiresNonBlankClientId() {
+        Map<String, Object> config = baseConfig();
+        config.put("clean_session", false);
+        config.put("client_id", "   ");
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> new MqttSourceConfig(ReadonlyConfig.fromMap(config)));
+    }
+
+    @Test
+    void testDefaultClientIdIsGeneratedForCleanSession() {
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+
+        
Assertions.assertTrue(sourceConfig.getClientId().startsWith("seatunnel_mqtt_source_"));
+    }
+
+    private static Map<String, Object> baseConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("url", "tcp://localhost:1883");
+        config.put("topic", "users");
+        return config;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
new file mode 100644
index 0000000000..210376bd56
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceFactoryTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MqttSourceFactoryTest {
+
+    @Test
+    void testFactoryIdentifier() {
+        MqttSourceFactory factory = new MqttSourceFactory();
+
+        Assertions.assertEquals(MqttSourceOptions.CONNECTOR_IDENTITY, 
factory.factoryIdentifier());
+        Assertions.assertEquals(MqttSource.class, factory.getSourceClass());
+    }
+
+    @Test
+    void testOptionRule() {
+        MqttSourceFactory factory = new MqttSourceFactory();
+        OptionRule rule = factory.optionRule();
+
+        List<Option<?>> requiredOptions =
+                rule.getRequiredOptions().stream()
+                        .flatMap(requiredOption -> 
requiredOption.getOptions().stream())
+                        .collect(Collectors.toList());
+        Assertions.assertTrue(requiredOptions.contains(MqttSourceOptions.URL));
+        
Assertions.assertTrue(requiredOptions.contains(MqttSourceOptions.TOPIC));
+        
Assertions.assertTrue(requiredOptions.contains(ConnectorCommonOptions.SCHEMA));
+
+        List<Option<?>> optionalOptions = rule.getOptionalOptions();
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.USERNAME));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.PASSWORD));
+        Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.QOS));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.FORMAT));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.FIELD_DELIMITER));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CLIENT_ID));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CLEAN_SESSION));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.CONNECTION_TIMEOUT));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.KEEP_ALIVE_INTERVAL));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSourceOptions.MAX_QUEUE_SIZE));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
new file mode 100644
index 0000000000..2836813753
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/source/MqttSourceTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.source;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+class MqttSourceTest {
+
+    @Test
+    void testSourceMetadataAndBoundedness() {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+        Assertions.assertEquals(MqttSourceOptions.CONNECTOR_IDENTITY, 
source.getPluginName());
+        Assertions.assertEquals(Boundedness.UNBOUNDED, 
source.getBoundedness());
+    }
+
+    @Test
+    void testStreamingJobModeIsAllowed() {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+        source.setJobContext(new JobContext().setJobMode(JobMode.STREAMING));
+
+        Assertions.assertEquals(Boundedness.UNBOUNDED, 
source.getBoundedness());
+    }
+
+    @Test
+    void testBatchJobModeFails() {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+        source.setJobContext(new JobContext().setJobMode(JobMode.BATCH));
+
+        Assertions.assertThrows(MqttConnectorException.class, 
source::getBoundedness);
+    }
+
+    @Test
+    void testProducedCatalogTables() {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+        List<CatalogTable> catalogTables = source.getProducedCatalogTables();
+
+        Assertions.assertEquals(1, catalogTables.size());
+        Assertions.assertArrayEquals(
+                new String[] {"id"}, 
catalogTables.get(0).getTableSchema().getFieldNames());
+    }
+
+    @Test
+    void testCreateReader() throws Exception {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+
+        SingleSplitReaderContext readerContext = 
Mockito.mock(SingleSplitReaderContext.class);
+        AbstractSingleSplitReader<?> reader = 
source.createReader(readerContext);
+
+        Assertions.assertInstanceOf(MqttSourceReader.class, reader);
+    }
+
+    @Test
+    void testReaderCollectsArrivedJsonMessage() throws Exception {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+        SingleSplitReaderContext readerContext = 
Mockito.mock(SingleSplitReaderContext.class);
+        MqttSourceReader reader = (MqttSourceReader) 
source.createReader(readerContext);
+        RecordingCollector collector = new RecordingCollector();
+
+        reader.messageArrived("users", mqttMessage("{\"id\":1}"));
+        reader.pollNext(collector);
+
+        Assertions.assertNotNull(collector.record);
+        Assertions.assertEquals(1, collector.record.getField(0));
+        Assertions.assertEquals(
+                
source.getProducedCatalogTables().get(0).getTablePath().toString(),
+                collector.record.getTableId());
+    }
+
+    @Test
+    void testReaderFailsWhenReconnectTimeoutExceeded() throws Exception {
+        Map<String, Object> config = baseConfig();
+        config.put("reconnect_timeout", 1);
+        MqttSource source = new MqttSource(ReadonlyConfig.fromMap(config));
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(config));
+        AtomicLong currentTimeMillis = new AtomicLong(0L);
+        MqttSourceReader reader =
+                new MqttSourceReader(
+                        sourceConfig,
+                        source.getProducedCatalogTables().get(0),
+                        currentTimeMillis::get);
+
+        RuntimeException cause = new RuntimeException("broker unavailable");
+        reader.connectionLost(cause);
+        currentTimeMillis.set(1001L);
+
+        Assertions.assertThrows(
+                MqttConnectorException.class, () -> reader.pollNext(new 
RecordingCollector()));
+    }
+
+    @Test
+    void testReaderFailsWhenResubscribeAfterReconnectFails() throws Exception {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+        MqttSourceReader reader =
+                new MqttSourceReader(
+                        sourceConfig,
+                        source.getProducedCatalogTables().get(0),
+                        System::currentTimeMillis);
+
+        try (org.mockito.MockedConstruction<MqttClient> mocked =
+                Mockito.mockConstruction(MqttClient.class)) {
+            reader.open();
+            MqttClient mockClient = mocked.constructed().get(0);
+            Mockito.doThrow(new 
MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
+                    .when(mockClient)
+                    .subscribe("users", 1);
+
+            reader.connectionLost(new RuntimeException("connection lost"));
+
+            Assertions.assertDoesNotThrow(
+                    () -> reader.connectComplete(true, 
"tcp://localhost:1883"));
+            Assertions.assertThrows(
+                    MqttConnectorException.class, () -> reader.pollNext(new 
RecordingCollector()));
+        }
+    }
+
+    @Test
+    void testReaderCloseForciblyDisconnectsWhenClientIsNotConnected() throws 
Exception {
+        MqttSource source = new 
MqttSource(ReadonlyConfig.fromMap(baseConfig()));
+        MqttSourceConfig sourceConfig = new 
MqttSourceConfig(ReadonlyConfig.fromMap(baseConfig()));
+        MqttSourceReader reader =
+                new MqttSourceReader(
+                        sourceConfig,
+                        source.getProducedCatalogTables().get(0),
+                        System::currentTimeMillis);
+
+        try (org.mockito.MockedConstruction<MqttClient> mocked =
+                Mockito.mockConstruction(MqttClient.class)) {
+            reader.open();
+            MqttClient mockClient = mocked.constructed().get(0);
+            Mockito.when(mockClient.isConnected()).thenReturn(false);
+
+            reader.close();
+
+            Mockito.verify(mockClient).disconnectForcibly();
+            Mockito.verify(mockClient).close();
+            Mockito.verify(mockClient, Mockito.never()).disconnect();
+        }
+    }
+
+    @Test
+    void testReaderFailsWhenMessageQueueIsFull() throws Exception {
+        Map<String, Object> config = baseConfig();
+        config.put("max_queue_size", 1);
+        MqttSource source = new MqttSource(ReadonlyConfig.fromMap(config));
+        SingleSplitReaderContext readerContext = 
Mockito.mock(SingleSplitReaderContext.class);
+        MqttSourceReader reader = (MqttSourceReader) 
source.createReader(readerContext);
+
+        reader.messageArrived("users", mqttMessage("{\"id\":1}"));
+
+        Assertions.assertThrows(
+                MqttConnectorException.class,
+                () -> reader.messageArrived("users", 
mqttMessage("{\"id\":2}")));
+        Assertions.assertThrows(
+                MqttConnectorException.class, () -> reader.pollNext(new 
RecordingCollector()));
+    }
+
+    private static Map<String, Object> baseConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("url", "tcp://localhost:1883");
+        config.put("topic", "users");
+        config.put("schema", schemaConfig());
+        return config;
+    }
+
+    private static Map<String, Object> schemaConfig() {
+        Map<String, Object> schema = new HashMap<>();
+        schema.put("fields", Collections.singletonMap("id", "int"));
+        return schema;
+    }
+
+    private static MqttMessage mqttMessage(String payload) {
+        return new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
+    }
+
+    private static class RecordingCollector implements Collector<SeaTunnelRow> 
{
+        private final Object checkpointLock = new Object();
+        private SeaTunnelRow record;
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            this.record = record;
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return checkpointLock;
+        }
+    }
+}

Reply via email to