DanielCarter-stack commented on PR #10575:
URL: https://github.com/apache/seatunnel/pull/10575#issuecomment-4018494408
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10575", "part": 1,
"total": 1} -->
### Issue 1: Exception handling does not comply with project specifications
**Location**: `MqttSinkWriter.java:85`
```java
throw new RuntimeException("Failed to connect MQTT client [" + clientId +
"]", e);
```
**Related Context**:
- Parent interface: `SinkWriter.java` (seatunnel-api)
- Reference implementation: `KafkaConnectorException.java` (connector-kafka)
- Caller: `MqttSink.createWriter()` (MqttSink.java:42-44)
**Issue Description**:
Using generic RuntimeException instead of custom ConnectorException class
violates SeaTunnel's error handling specification. Other Connectors (such as
Kafka, JDBC) define specialized exception classes inheriting from
SeaTunnelRuntimeException.
**Potential Risks**:
- Error type is unclear, making targeted handling difficult for upper layers
- Missing error codes, unable to internationalize error messages
- Inconsistent with other Connectors in the project, increasing maintenance
costs
**Scope of Impact**:
- Direct impact: MqttSinkWriter constructor
- Indirect impact: JobMaster's error handling logic
- Impact scope: Single Connector
**Severity**: MAJOR
**Improvement Suggestions**:
1. Create custom exception class:
```java
//
seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
private final String code;
private final String description;
// Constructor and getter
}
```
```java
//
seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class MqttConnectorException extends SeaTunnelRuntimeException {
public MqttConnectorException(SeaTunnelErrorCode errorCode, String
errorMessage) {
super(errorCode, errorMessage);
}
public MqttConnectorException(SeaTunnelErrorCode errorCode, String
errorMessage, Throwable cause) {
super(errorCode, errorMessage, cause);
}
}
```
2. Modify MqttSinkWriter:
```java
// Import new exception class
import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
// In constructor
} catch (MqttException e) {
throw new MqttConnectorException(
MqttConnectorErrorCode.CONNECTION_FAILED,
"Failed to connect MQTT client [" + clientId + "]",
e);
}
// In write method
throw new IOException(
new MqttConnectorException(
MqttConnectorErrorCode.PUBLISH_FAILED,
"Failed to publish MQTT message after " + retryTimeoutMs + "ms")
.getMessage(),
lastException);
```
**Rationale**: Follow project specifications to improve consistency and
maintainability of error handling.
---
### Issue 2: Missing QoS parameter validation
**Location**: `MqttSinkOptions.java:49-53` and `MqttSinkWriter.java:61`
```java
// MqttSinkOptions.java
public static final Option<Integer> QOS =
Options.key("qos")
.intType()
.defaultValue(1)
.withDescription("MQTT QoS level: 0 (at-most-once), 1
(at-least-once)");
// MqttSinkWriter.java - No validation
this.qos = pluginConfig.get(MqttSinkOptions.QOS);
```
**Related Context**:
- Caller: MqttSinkWriter constructor
- Reference implementation: KafkaSinkOptions has parameter validation
- Related method: MqttSinkWriter.write() line 93 uses qos
**Issue Description**:
Code comments indicate only QoS 0 and 1 are supported, but user input is not
validated. If user configures qos=2, it will cause runtime errors or undefined
behavior.
**Potential Risks**:
- User configuration errors are discovered at runtime instead of startup
- Error messages are unclear (Paho Client errors may not be intuitive)
- Violates "fail-fast" principle
**Scope of Impact**:
- Direct impact: MqttSinkWriter.write()
- Indirect impact: Job startup failure
- Impact scope: Single Connector
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Add validation in MqttSinkWriter constructor
public MqttSinkWriter(
SinkWriter.Context context, SeaTunnelRowType rowType, ReadonlyConfig
pluginConfig) {
this.topic = pluginConfig.get(MqttSinkOptions.TOPIC);
this.qos = pluginConfig.get(MqttSinkOptions.QOS);
// Add QoS validation
if (qos < 0 || qos > 1) {
throw new IllegalArgumentException(
"MQTT QoS must be 0 (at-most-once) or 1 (at-least-once), got: "
+ qos);
}
this.retryTimeoutMs = pluginConfig.get(MqttSinkOptions.RETRY_TIMEOUT);
// ...
}
```
Or add validation in MqttSinkOptions:
```java
public static final Option<Integer> QOS =
Options.key("qos")
.intType()
.defaultValue(1)
.withValidator validators -> {
int qos = (Integer) validators;
if (qos < 0 || qos > 1) {
throw new IllegalArgumentException("QoS must be 0 or
1");
}
})
.withDescription("MQTT QoS level: 0 (at-most-once), 1
(at-least-once)");
```
**Rationale**: Provide clear error messages and detect errors during
configuration phase rather than runtime.
---
### Issue 3: CleanSession=true contradicts at-least-once semantics
**Location**: `MqttSinkWriter.java:168`
```java
options.setCleanSession(true);
```
**Related Context**:
- Configuration location: MqttSinkWriter.buildConnectOptions() lines 165-180
- Call chain: MqttSinkWriter constructor → buildConnectOptions() →
MqttClient.connect()
- Documentation claim: `docs/en/connectors/sink/Mqtt.md` line 17
**Issue Description**:
Code sets `CleanSession=true`, but documentation claims to provide
"at-least-once" semantics. According to MQTT 3.1.1 protocol:
- When CleanSession=true, Broker does not save unacknowledged QoS 1 messages
- After client disconnects, these messages are permanently lost upon
reconnection
- This directly contradicts "at-least-once" semantics
**Potential Risks**:
- Users may incorrectly believe messages will not be lost
- Data may be lost in failure scenarios
- Does not match documentation promises
**Scope of Impact**:
- Direct impact: Reliability guarantees of MqttSinkWriter
- Indirect impact: User expectations of data reliability
- Impact scope: Single Connector
**Severity**: CRITICAL
**Improvement Suggestions**:
1. **Short-term solution** (fix documentation):
Clearly document limitations in documentation:
```markdown
## Key features
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
**Delivery Semantics Notice**:
This connector provides **at-most-once** delivery when QoS=0, and
**best-effort at-least-once** when QoS=1.
Due to `cleanSession=true` (required for stateless operation),
unacknowledged messages may be lost during
client disconnections. For stronger guarantees, consider enabling Source
replay capabilities in SeaTunnel.
```
2. **Long-term solution** (let user choose):
Add configuration option:
```java
// MqttSinkOptions.java
public static final Option<Boolean> CLEAN_SESSION =
Options.key("clean_session")
.booleanType()
.defaultValue(true)
.withDescription("Whether to use clean session. false enables
persistent sessions but may cause broker-side state accumulation");
```
```java
// MqttSinkWriter.java
options.setCleanSession(config.get(MqttSinkOptions.CLEAN_SESSION));
if (!config.get(MqttSinkOptions.CLEAN_SESSION)) {
log.warn("clean_session=false may cause broker-side state accumulation.
Ensure proper clientId management.");
}
```
**Rationale**: Honestly inform users of limitations to avoid misleading
promises. CleanSession=true is reasonable for stateless design, but should not
claim to provide complete at-least-once guarantees.
---
### Issue 4: Missing unit tests
**Location**: `seatunnel-connectors-v2/connector-mqtt/src/test/java/`
**Related Context**:
- Reference implementation: connector-kafka has complete unit test suite
- E2E tests: MqttSinkIT.java exists but insufficient
- Classes under test: MqttSink, MqttSinkFactory, MqttSinkWriter
**Issue Description**:
Only E2E integration tests exist, missing unit tests. E2E tests cannot cover:
- Boundary conditions
- Exception paths
- Parameter validation
- Configuration parsing
**Potential Risks**:
- Easy to introduce bugs during refactoring
- Boundary conditions not covered (e.g., QoS=-1)
- Error handling paths not tested
- Low code coverage
**Scope of Impact**:
- Direct impact: Code quality assurance
- Indirect impact: Future maintenance costs
- Impact scope: Single Connector
**Severity**: MAJOR
**Improvement Suggestions**:
Create unit test classes:
```java
//
seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class MqttSinkWriterTest {
@Mock
private SinkWriter.Context context;
@Mock
private SeaTunnelRowType rowType;
@Test
void testInvalidQosThrowsException() {
ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
"url", "tcp://localhost:1883",
"topic", "test",
"qos", 2 // Invalid value
));
IllegalArgumentException ex = assertThrows(
IllegalArgumentException.class,
() -> new MqttSinkWriter(context, rowType, config)
);
assertTrue(ex.getMessage().contains("QoS must be 0 or 1"));
}
@Test
void testInvalidFormatThrowsException() {
ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
"url", "tcp://localhost:1883",
"topic", "test",
"format", "xml" // Invalid format
));
assertThrows(
IllegalArgumentException.class,
() -> new MqttSinkWriter(context, rowType, config)
);
}
@Test
void testConnectionFailureThrowsWrappedException() {
// Mock Paho client to throw MqttException
// Validation exception is properly wrapped
}
@Test
void testWriteWithRetrySuccess() {
// Simulate first failure, second success
}
@Test
void testWriteTimeoutAfterRetries() {
// Simulate retry timeout
}
}
```
```java
//
seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
@Test
void testOptionRule() {
MqttSinkFactory factory = new MqttSinkFactory();
OptionRule rule = factory.optionRule();
Set<Option<?>> required = rule.getRequiredOptions();
assertTrue(required.contains(MqttSinkOptions.URL));
assertTrue(required.contains(MqttSinkOptions.TOPIC));
Set<Option<?>> optional = rule.getOptionalOptions();
assertTrue(optional.contains(MqttSinkOptions.QOS));
}
```
**Rationale**: Improve code quality, ensure safe refactoring, meet Apache
project standards.
---
### Issue 5: Text format delimiter hardcoded
**Location**: `MqttSinkWriter.java:189-192`
```java
case "text":
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(",")
.build();
```
**Related Context**:
- Reference implementation: KafkaSink supports custom delimiter
- Configuration option: MqttSinkOptions.FORMAT definition
- Related class: TextSerializationSchema
**Issue Description**:
Text format field delimiter is hardcoded to comma, users cannot customize.
For certain data scenarios, other delimiters may be needed (e.g., tab, pipe,
etc.).
**Potential Risks**:
- Limits user flexibility
- Inconsistent with Kafka Sink (Kafka supports field_delimiter configuration)
- May cause parsing issues
**Scope of Impact**:
- Direct impact: Users using Text format
- Indirect impact: Data format compatibility
- Impact scope: Single Connector
**Severity**: MINOR
**Improvement Suggestions**:
1. Add configuration option:
```java
// MqttSinkOptions.java
public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
.defaultValue(",")
.withDescription("Field delimiter for text format. Only used when
format=text");
```
2. Register in MqttSinkFactory:
```java
.optional(
MqttSinkOptions.USERNAME,
MqttSinkOptions.PASSWORD,
MqttSinkOptions.QOS,
MqttSinkOptions.FORMAT,
MqttSinkOptions.FIELD_DELIMITER, // Add
MqttSinkOptions.RETRY_TIMEOUT,
MqttSinkOptions.CONNECTION_TIMEOUT)
```
3. Use in MqttSinkWriter:
```java
case "text":
String delimiter = config.get(MqttSinkOptions.FIELD_DELIMITER);
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();
```
**Rationale**: Improve flexibility, stay consistent with Kafka Sink.
---
### Issue 6: Changelog placeholder not updated
**Location**: `docs/en/connectors/changelog/connector-mqtt.md:7`
```markdown
- Add MQTT Sink Connector
([#XXXX](https://github.com/apache/seatunnel/pull/XXXX))
```
**Related Context**:
- PR number: #10575
- File: connector-mqtt.md
- Release process: CHANGELOG automatically generated after merge
**Issue Description**:
Changelog contains placeholder `#XXXX`, should be replaced with actual PR
number before submission.
**Potential Risks**:
- Incomplete changelog
- Automation tools may not link correctly
- Users cannot trace change sources
**Scope of Impact**:
- Direct impact: Changelog quality
- Indirect impact: User experience
- Impact scope: Single Connector
**Severity**: MINOR
**Improvement Suggestions**:
```markdown
## next version
### Sink
- Add MQTT Sink Connector
([#10575](https://github.com/apache/seatunnel/pull/10575))
```
Also recommend linking Issue #9566:
```markdown
- Add MQTT Sink Connector
([#10575](https://github.com/apache/seatunnel/pull/10575))
Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
```
**Rationale**: Maintain documentation integrity, facilitate user tracing.
---
### Issue 7: Performance bottleneck - synchronous blocking send
**Location**: `MqttSinkWriter.java:90-118`
```java
public void write(SeaTunnelRow element) throws IOException {
byte[] payload = serializationSchema.serialize(element);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
// Synchronous retry loop
while (System.currentTimeMillis() < deadline) {
if (mqttClient.isConnected()) {
mqttClient.publish(topic, message); // Blocking call
return;
}
Thread.sleep(RETRY_BACKOFF_MS);
}
}
```
**Related Context**:
- Paho Client: `MqttClient.publish()` is synchronous blocking method
- Comparison: KafkaSink uses asynchronous Producer
- Performance impact: Each message requires waiting for network round-trip
**Issue Description**:
Using synchronous blocking method to send each message. Even though QoS 1
ACK is asynchronous, Paho's publish() method blocks until message sending
completes. This becomes a bottleneck in high-throughput scenarios.
**Potential Risks**:
- Limited throughput (thousands of messages per second level)
- Increased latency
- Does not align with SeaTunnel streaming engine's high-performance goals
**Scope of Impact**:
- Direct impact: Users in high-performance scenarios
- Indirect impact: Overall pipeline throughput
- Impact scope: Single Connector
**Severity**: MAJOR (if positioned as high-performance Connector)
**Severity**: MINOR (if positioned as lightweight IoT Connector, current
performance acceptable)
**Improvement Suggestions**:
**Solution 1: Batch send (recommended)**
```java
// Add batch configuration
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(1)
.withDescription("Number of messages to batch before sending");
// Implement batch sending logic
private final List<MqttMessage> messageBuffer = new ArrayList<>(batchSize);
@Override
public void write(SeaTunnelRow element) throws IOException {
byte[] payload = serializationSchema.serialize(element);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
synchronized (messageBuffer) {
messageBuffer.add(message);
if (messageBuffer.size() >= batchSize) {
flushBatch();
}
}
}
private void flushBatch() throws IOException {
// Use MqttClient.publish(topic, MqttMessage[]) for batch sending
// Or send in loop but reduce synchronization overhead
}
@Override
public Optional<Void> prepareCommit() {
flushBatch(); // Flush before checkpoint
return Optional.empty();
}
```
**Solution 2: Async send**
```java
// Use MqttAsyncClient (but requires major refactoring)
// Or use thread pool for asynchronous sending
```
**Solution 3: Document in docs (simplest)**
Document performance characteristics in documentation:
```markdown
## Performance Considerations
The MQTT Sink sends messages synchronously to guarantee delivery. Typical
throughput:
- QoS 0: ~10,000 messages/sec (local network)
- QoS 1: ~5,000 messages/sec (requires broker ACK)
For higher throughput requirements, consider:
- Using Kafka Sink instead
- Reducing QoS to 0
- Increasing SeaTunnel parallelism
```
**Rationale**: Current design is sufficient for IoT scenarios (low-frequency
messages), but performance limitations should be clearly documented to avoid
user misunderstanding.
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]