DanielCarter-stack commented on PR #10575:
URL: https://github.com/apache/seatunnel/pull/10575#issuecomment-4018494408

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10575", "part": 1, 
"total": 1} -->
   ### Issue 1: Exception handling does not comply with project specifications
   
   **Location**: `MqttSinkWriter.java:85`
   
   ```java
   throw new RuntimeException("Failed to connect MQTT client [" + clientId + 
"]", e);
   ```
   
   **Related Context**:
   - Parent interface: `SinkWriter.java` (seatunnel-api)
   - Reference implementation: `KafkaConnectorException.java` (connector-kafka)
   - Caller: `MqttSink.createWriter()` (MqttSink.java:42-44)
   
   **Issue Description**:
   Using generic RuntimeException instead of custom ConnectorException class 
violates SeaTunnel's error handling specification. Other Connectors (such as 
Kafka, JDBC) define specialized exception classes inheriting from 
SeaTunnelRuntimeException.
   
   **Potential Risks**:
   - Error type is unclear, making targeted handling difficult for upper layers
   - Missing error codes, unable to internationalize error messages
   - Inconsistent with other Connectors in the project, increasing maintenance 
costs
   
   **Scope of Impact**:
   - Direct impact: MqttSinkWriter constructor
   - Indirect impact: JobMaster's error handling logic
   - Impact scope: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   1. Create custom exception class:
   ```java
   // 
seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
   package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
   
   import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
   
   public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
       CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
       PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
       INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
       
       private final String code;
       private final String description;
       
       // Constructor and getter
   }
   ```
   
   ```java
   // 
seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
   package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
   
   import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
   import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
   
   public class MqttConnectorException extends SeaTunnelRuntimeException {
       public MqttConnectorException(SeaTunnelErrorCode errorCode, String 
errorMessage) {
           super(errorCode, errorMessage);
       }
       
       public MqttConnectorException(SeaTunnelErrorCode errorCode, String 
errorMessage, Throwable cause) {
           super(errorCode, errorMessage, cause);
       }
   }
   ```
   
   2. Modify MqttSinkWriter:
   ```java
   // Import new exception class
   import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
   import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
   
   // In constructor
   } catch (MqttException e) {
       throw new MqttConnectorException(
           MqttConnectorErrorCode.CONNECTION_FAILED,
           "Failed to connect MQTT client [" + clientId + "]",
           e);
   }
   
   // In write method
   throw new IOException(
       new MqttConnectorException(
           MqttConnectorErrorCode.PUBLISH_FAILED,
           "Failed to publish MQTT message after " + retryTimeoutMs + "ms")
           .getMessage(),
       lastException);
   ```
   
   **Rationale**: Follow project specifications to improve consistency and 
maintainability of error handling.
   
   ---
   
   ### Issue 2: Missing QoS parameter validation
   
   **Location**: `MqttSinkOptions.java:49-53` and `MqttSinkWriter.java:61`
   
   ```java
   // MqttSinkOptions.java
   public static final Option<Integer> QOS =
           Options.key("qos")
                   .intType()
                   .defaultValue(1)
                   .withDescription("MQTT QoS level: 0 (at-most-once), 1 
(at-least-once)");
   
   // MqttSinkWriter.java - No validation
   this.qos = pluginConfig.get(MqttSinkOptions.QOS);
   ```
   
   **Related Context**:
   - Caller: MqttSinkWriter constructor
   - Reference implementation: KafkaSinkOptions has parameter validation
   - Related method: MqttSinkWriter.write() line 93 uses qos
   
   **Issue Description**:
   Code comments indicate only QoS 0 and 1 are supported, but user input is not 
validated. If user configures qos=2, it will cause runtime errors or undefined 
behavior.
   
   **Potential Risks**:
   - User configuration errors are discovered at runtime instead of startup
   - Error messages are unclear (Paho Client errors may not be intuitive)
   - Violates "fail-fast" principle
   
   **Scope of Impact**:
   - Direct impact: MqttSinkWriter.write()
   - Indirect impact: Job startup failure
   - Impact scope: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   ```java
   // Add validation in MqttSinkWriter constructor
   public MqttSinkWriter(
           SinkWriter.Context context, SeaTunnelRowType rowType, ReadonlyConfig 
pluginConfig) {
       this.topic = pluginConfig.get(MqttSinkOptions.TOPIC);
       this.qos = pluginConfig.get(MqttSinkOptions.QOS);
       
       // Add QoS validation
       if (qos < 0 || qos > 1) {
           throw new IllegalArgumentException(
               "MQTT QoS must be 0 (at-most-once) or 1 (at-least-once), got: " 
+ qos);
       }
       
       this.retryTimeoutMs = pluginConfig.get(MqttSinkOptions.RETRY_TIMEOUT);
       // ...
   }
   ```
   
   Or add validation in MqttSinkOptions:
   ```java
   public static final Option<Integer> QOS =
           Options.key("qos")
                   .intType()
                   .defaultValue(1)
                   .withValidator validators -> {
                       int qos = (Integer) validators;
                       if (qos < 0 || qos > 1) {
                           throw new IllegalArgumentException("QoS must be 0 or 
1");
                       }
                   })
                   .withDescription("MQTT QoS level: 0 (at-most-once), 1 
(at-least-once)");
   ```
   
   **Rationale**: Provide clear error messages and detect errors during 
configuration phase rather than runtime.
   
   ---
   
   ### Issue 3: CleanSession=true contradicts at-least-once semantics
   
   **Location**: `MqttSinkWriter.java:168`
   
   ```java
   options.setCleanSession(true);
   ```
   
   **Related Context**:
   - Configuration location: MqttSinkWriter.buildConnectOptions() lines 165-180
   - Call chain: MqttSinkWriter constructor → buildConnectOptions() → 
MqttClient.connect()
   - Documentation claim: `docs/en/connectors/sink/Mqtt.md` line 17
   
   **Issue Description**:
   Code sets `CleanSession=true`, but documentation claims to provide 
"at-least-once" semantics. According to MQTT 3.1.1 protocol:
   - When CleanSession=true, Broker does not save unacknowledged QoS 1 messages
   - After client disconnects, these messages are permanently lost upon 
reconnection
   - This directly contradicts "at-least-once" semantics
   
   **Potential Risks**:
   - Users may incorrectly believe messages will not be lost
   - Data may be lost in failure scenarios
   - Does not match documentation promises
   
   **Scope of Impact**:
   - Direct impact: Reliability guarantees of MqttSinkWriter
   - Indirect impact: User expectations of data reliability
   - Impact scope: Single Connector
   
   **Severity**: CRITICAL
   
   **Improvement Suggestions**:
   
   1. **Short-term solution** (fix documentation):
   Clearly document limitations in documentation:
   ```markdown
   ## Key features
   
   - [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
   
   **Delivery Semantics Notice**: 
   This connector provides **at-most-once** delivery when QoS=0, and 
**best-effort at-least-once** when QoS=1. 
   Due to `cleanSession=true` (required for stateless operation), 
unacknowledged messages may be lost during 
   client disconnections. For stronger guarantees, consider enabling Source 
replay capabilities in SeaTunnel.
   ```
   
   2. **Long-term solution** (let user choose):
   Add configuration option:
   ```java
   // MqttSinkOptions.java
   public static final Option<Boolean> CLEAN_SESSION =
       Options.key("clean_session")
           .booleanType()
           .defaultValue(true)
           .withDescription("Whether to use clean session. false enables 
persistent sessions but may cause broker-side state accumulation");
   ```
   
   ```java
   // MqttSinkWriter.java
   options.setCleanSession(config.get(MqttSinkOptions.CLEAN_SESSION));
   if (!config.get(MqttSinkOptions.CLEAN_SESSION)) {
       log.warn("clean_session=false may cause broker-side state accumulation. 
Ensure proper clientId management.");
   }
   ```
   
   **Rationale**: Honestly inform users of limitations to avoid misleading 
promises. CleanSession=true is reasonable for stateless design, but should not 
claim to provide complete at-least-once guarantees.
   
   ---
   
   ### Issue 4: Missing unit tests
   
   **Location**: `seatunnel-connectors-v2/connector-mqtt/src/test/java/`
   
   **Related Context**:
   - Reference implementation: connector-kafka has complete unit test suite
   - E2E tests: MqttSinkIT.java exists but insufficient
   - Classes under test: MqttSink, MqttSinkFactory, MqttSinkWriter
   
   **Issue Description**:
   Only E2E integration tests exist, missing unit tests. E2E tests cannot cover:
   - Boundary conditions
   - Exception paths
   - Parameter validation
   - Configuration parsing
   
   **Potential Risks**:
   - Easy to introduce bugs during refactoring
   - Boundary conditions not covered (e.g., QoS=-1)
   - Error handling paths not tested
   - Low code coverage
   
   **Scope of Impact**:
   - Direct impact: Code quality assurance
   - Indirect impact: Future maintenance costs
   - Impact scope: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   Create unit test classes:
   ```java
   // 
seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
   package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
   
   import org.apache.seatunnel.api.configuration.ReadonlyConfig;
   import org.apache.seatunnel.api.table.type.SeaTunnelRow;
   import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
   import org.junit.jupiter.api.Test;
   import org.junit.jupiter.api.extension.ExtendWith;
   import org.mockito.Mock;
   import org.mockito.junit.jupiter.MockitoExtension;
   
   import static org.junit.jupiter.api.Assertions.*;
   import static org.mockito.Mockito.*;
   
   @ExtendWith(MockitoExtension.class)
   class MqttSinkWriterTest {
       
       @Mock
       private SinkWriter.Context context;
       
       @Mock
       private SeaTunnelRowType rowType;
       
       @Test
       void testInvalidQosThrowsException() {
           ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
               "url", "tcp://localhost:1883",
               "topic", "test",
               "qos", 2  // Invalid value
           ));
           
           IllegalArgumentException ex = assertThrows(
               IllegalArgumentException.class,
               () -> new MqttSinkWriter(context, rowType, config)
           );
           
           assertTrue(ex.getMessage().contains("QoS must be 0 or 1"));
       }
       
       @Test
       void testInvalidFormatThrowsException() {
           ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
               "url", "tcp://localhost:1883",
               "topic", "test",
               "format", "xml"  // Invalid format
           ));
           
           assertThrows(
               IllegalArgumentException.class,
               () -> new MqttSinkWriter(context, rowType, config)
           );
       }
       
       @Test
       void testConnectionFailureThrowsWrappedException() {
           // Mock Paho client to throw MqttException
           // Validation exception is properly wrapped
       }
       
       @Test
       void testWriteWithRetrySuccess() {
           // Simulate first failure, second success
       }
       
       @Test
       void testWriteTimeoutAfterRetries() {
           // Simulate retry timeout
       }
   }
   ```
   
   ```java
   // 
seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
   @Test
   void testOptionRule() {
       MqttSinkFactory factory = new MqttSinkFactory();
       OptionRule rule = factory.optionRule();
       
       Set<Option<?>> required = rule.getRequiredOptions();
       assertTrue(required.contains(MqttSinkOptions.URL));
       assertTrue(required.contains(MqttSinkOptions.TOPIC));
       
       Set<Option<?>> optional = rule.getOptionalOptions();
       assertTrue(optional.contains(MqttSinkOptions.QOS));
   }
   ```
   
   **Rationale**: Improve code quality, ensure safe refactoring, meet Apache 
project standards.
   
   ---
   
   ### Issue 5: Text format delimiter hardcoded
   
   **Location**: `MqttSinkWriter.java:189-192`
   
   ```java
   case "text":
       return TextSerializationSchema.builder()
               .seaTunnelRowType(rowType)
               .delimiter(",")
               .build();
   ```
   
   **Related Context**:
   - Reference implementation: KafkaSink supports custom delimiter
   - Configuration option: MqttSinkOptions.FORMAT definition
   - Related class: TextSerializationSchema
   
   **Issue Description**:
   Text format field delimiter is hardcoded to comma, users cannot customize. 
For certain data scenarios, other delimiters may be needed (e.g., tab, pipe, 
etc.).
   
   **Potential Risks**:
   - Limits user flexibility
   - Inconsistent with Kafka Sink (Kafka supports field_delimiter configuration)
   - May cause parsing issues
   
   **Scope of Impact**:
   - Direct impact: Users using Text format
   - Indirect impact: Data format compatibility
   - Impact scope: Single Connector
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   1. Add configuration option:
   ```java
   // MqttSinkOptions.java
   public static final Option<String> FIELD_DELIMITER =
       Options.key("field_delimiter")
           .stringType()
           .defaultValue(",")
           .withDescription("Field delimiter for text format. Only used when 
format=text");
   ```
   
   2. Register in MqttSinkFactory:
   ```java
   .optional(
       MqttSinkOptions.USERNAME,
       MqttSinkOptions.PASSWORD,
       MqttSinkOptions.QOS,
       MqttSinkOptions.FORMAT,
       MqttSinkOptions.FIELD_DELIMITER,  // Add
       MqttSinkOptions.RETRY_TIMEOUT,
       MqttSinkOptions.CONNECTION_TIMEOUT)
   ```
   
   3. Use in MqttSinkWriter:
   ```java
   case "text":
       String delimiter = config.get(MqttSinkOptions.FIELD_DELIMITER);
       return TextSerializationSchema.builder()
               .seaTunnelRowType(rowType)
               .delimiter(delimiter)
               .build();
   ```
   
   **Rationale**: Improve flexibility, stay consistent with Kafka Sink.
   
   ---
   
   ### Issue 6: Changelog placeholder not updated
   
   **Location**: `docs/en/connectors/changelog/connector-mqtt.md:7`
   
   ```markdown
   - Add MQTT Sink Connector 
([#XXXX](https://github.com/apache/seatunnel/pull/XXXX))
   ```
   
   **Related Context**:
   - PR number: #10575
   - File: connector-mqtt.md
   - Release process: CHANGELOG automatically generated after merge
   
   **Issue Description**:
   Changelog contains placeholder `#XXXX`, should be replaced with actual PR 
number before submission.
   
   **Potential Risks**:
   - Incomplete changelog
   - Automation tools may not link correctly
   - Users cannot trace change sources
   
   **Scope of Impact**:
   - Direct impact: Changelog quality
   - Indirect impact: User experience
   - Impact scope: Single Connector
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```markdown
   ## next version
   
   ### Sink
   
   - Add MQTT Sink Connector 
([#10575](https://github.com/apache/seatunnel/pull/10575))
   ```
   
   Also recommend linking Issue #9566:
   ```markdown
   - Add MQTT Sink Connector 
([#10575](https://github.com/apache/seatunnel/pull/10575))
     Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
   ```
   
   **Rationale**: Maintain documentation integrity, facilitate user tracing.
   
   ---
   
   ### Issue 7: Performance bottleneck - synchronous blocking send
   
   **Location**: `MqttSinkWriter.java:90-118`
   
   ```java
   public void write(SeaTunnelRow element) throws IOException {
       byte[] payload = serializationSchema.serialize(element);
       MqttMessage message = new MqttMessage(payload);
       message.setQos(qos);
       
       // Synchronous retry loop
       while (System.currentTimeMillis() < deadline) {
           if (mqttClient.isConnected()) {
               mqttClient.publish(topic, message);  // Blocking call
               return;
           }
           Thread.sleep(RETRY_BACKOFF_MS);
       }
   }
   ```
   
   **Related Context**:
   - Paho Client: `MqttClient.publish()` is synchronous blocking method
   - Comparison: KafkaSink uses asynchronous Producer
   - Performance impact: Each message requires waiting for network round-trip
   
   **Issue Description**:
   Using synchronous blocking method to send each message. Even though QoS 1 
ACK is asynchronous, Paho's publish() method blocks until message sending 
completes. This becomes a bottleneck in high-throughput scenarios.
   
   **Potential Risks**:
   - Limited throughput (thousands of messages per second level)
   - Increased latency
   - Does not align with SeaTunnel streaming engine's high-performance goals
   
   **Scope of Impact**:
   - Direct impact: Users in high-performance scenarios
   - Indirect impact: Overall pipeline throughput
   - Impact scope: Single Connector
   
   **Severity**: MAJOR (if positioned as high-performance Connector)
   **Severity**: MINOR (if positioned as lightweight IoT Connector, current 
performance acceptable)
   
   **Improvement Suggestions**:
   
   **Solution 1: Batch send (recommended)**
   ```java
   // Add batch configuration
   public static final Option<Integer> BATCH_SIZE =
       Options.key("batch_size")
           .intType()
           .defaultValue(1)
           .withDescription("Number of messages to batch before sending");
   
   // Implement batch sending logic
   private final List<MqttMessage> messageBuffer = new ArrayList<>(batchSize);
   
   @Override
   public void write(SeaTunnelRow element) throws IOException {
       byte[] payload = serializationSchema.serialize(element);
       MqttMessage message = new MqttMessage(payload);
       message.setQos(qos);
       
       synchronized (messageBuffer) {
           messageBuffer.add(message);
           if (messageBuffer.size() >= batchSize) {
               flushBatch();
           }
       }
   }
   
   private void flushBatch() throws IOException {
       // Use MqttClient.publish(topic, MqttMessage[]) for batch sending
       // Or send in loop but reduce synchronization overhead
   }
   
   @Override
   public Optional<Void> prepareCommit() {
       flushBatch();  // Flush before checkpoint
       return Optional.empty();
   }
   ```
   
   **Solution 2: Async send**
   ```java
   // Use MqttAsyncClient (but requires major refactoring)
   // Or use thread pool for asynchronous sending
   ```
   
   **Solution 3: Document in docs (simplest)**
   Document performance characteristics in documentation:
   ```markdown
   ## Performance Considerations
   
   The MQTT Sink sends messages synchronously to guarantee delivery. Typical 
throughput:
   - QoS 0: ~10,000 messages/sec (local network)
   - QoS 1: ~5,000 messages/sec (requires broker ACK)
   
   For higher throughput requirements, consider:
   - Using Kafka Sink instead
   - Reducing QoS to 0
   - Increasing SeaTunnel parallelism
   ```
   
   **Rationale**: Current design is sufficient for IoT scenarios (low-frequency 
messages), but performance limitations should be clearly documented to avoid 
user misunderstanding.
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to