This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 701f17b5d4 [Improve][Connector-V2] Add optional flag for rocketmq 
connector to skip parse errors instead of failing (#8737)
701f17b5d4 is described below

commit 701f17b5d40c680c5d14a74c6f4c02c04f2ad96b
Author: xiaochen <[email protected]>
AuthorDate: Mon Feb 17 09:55:04 2025 +0800

    [Improve][Connector-V2] Add optional flag for rocketmq connector to skip 
parse errors instead of failing (#8737)
---
 docs/en/connector-v2/source/RocketMQ.md                             | 1 +
 .../seatunnel/connectors/seatunnel/rocketmq/config/Config.java      | 2 ++
 .../connectors/seatunnel/rocketmq/config/ConsumerConfig.java        | 6 ++++++
 .../seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java  | 3 ++-
 .../connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java     | 2 +-
 .../connectors/seatunnel/rocketmq/source/RocketMqSource.java        | 6 +++++-
 .../connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java | 5 +++--
 .../src/test/resources/rocketmq-source_json_to_console.conf         | 1 +
 8 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/docs/en/connector-v2/source/RocketMQ.md 
b/docs/en/connector-v2/source/RocketMQ.md
index eb8edc1c80..0691afbc72 100644
--- a/docs/en/connector-v2/source/RocketMQ.md
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -44,6 +44,7 @@ Source connector for Apache RocketMQ.
 | start.mode.offsets                  |         | no       |                   
         |                                                                      
                                                                                
                                                              |
 | start.mode.timestamp                | Long    | no       |                   
         | The time required for consumption mode to be 
"CONSUME_FROM_TIMESTAMP".                                                       
                                                                                
      |
 | partition.discovery.interval.millis | long    | no       | -1                
         | The interval for dynamically discovering topics and partitions.      
                                                                                
                                                              |
+| ignore_parse_errors                 | Boolean | no       | false             
         | Optional flag to skip parse errors instead of failing.               
                                                                                
                                                              |
 | common-options                      | config  | no       | -                 
         | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                   |
 
 ### start.mode.offsets
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
index eced972601..c1a7aa8067 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
@@ -26,6 +26,8 @@ public class Config {
     /** The default field delimiter is “,” */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
+    public static final String CONNECTOR_IDENTITY = "Rocketmq";
+
     public static final Option<String> NAME_SRV_ADDR =
             Options.key("name.srv.addr")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
index 534b4c8bd4..642278792e 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
@@ -86,4 +86,10 @@ public class ConsumerConfig extends Config {
                     .longType()
                     .defaultValue(DEFAULT_POLL_TIMEOUT_MILLIS)
                     .withDescription("The poll timeout in milliseconds.");
+
+    public static final Option<Boolean> IGNORE_PARSE_ERRORS =
+            Options.key("ignore_parse_errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Optional flag to skip parse errors 
instead of failing.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index 2bd55150c0..c4e61d24bd 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -43,6 +43,7 @@ import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.CONNECTOR_IDENTITY;
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
@@ -57,7 +58,7 @@ public class RocketMqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public String getPluginName() {
-        return "Rocketmq";
+        return CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
index efad35dea2..85d9bd02df 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
@@ -29,7 +29,7 @@ import com.google.auto.service.AutoService;
 public class RocketMqSinkFactory implements TableSinkFactory {
     @Override
     public String factoryIdentifier() {
-        return "Rocketmq";
+        return Config.CONNECTOR_IDENTITY;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
index 39052e5172..3558d3a5f8 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -260,10 +260,14 @@ public class RocketMqSource
             if (config.hasPath(FORMAT.key())) {
                 format = SchemaFormat.find(config.getString(FORMAT.key()));
             }
+            boolean ignoreParseErrors = false;
+            if (config.hasPath(ConsumerConfig.IGNORE_PARSE_ERRORS.key())) {
+                ignoreParseErrors = 
config.getBoolean(ConsumerConfig.IGNORE_PARSE_ERRORS.key());
+            }
             switch (format) {
                 case JSON:
                     deserializationSchema =
-                            new JsonDeserializationSchema(catalogTable, false, 
false);
+                            new JsonDeserializationSchema(catalogTable, false, 
ignoreParseErrors);
                     break;
                 case TEXT:
                     String delimiter = DEFAULT_FIELD_DELIMITER;
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
index e1f756f0b2..fd04ac37d8 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -32,7 +32,7 @@ public class RocketMqSourceFactory implements 
TableSourceFactory {
 
     @Override
     public String factoryIdentifier() {
-        return "Rocketmq";
+        return Config.CONNECTOR_IDENTITY;
     }
 
     @Override
@@ -55,7 +55,8 @@ public class RocketMqSourceFactory implements 
TableSourceFactory {
                 .conditional(
                         ConsumerConfig.START_MODE,
                         StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
-                        ConsumerConfig.START_MODE_OFFSETS)
+                        ConsumerConfig.START_MODE_OFFSETS,
+                        ConsumerConfig.IGNORE_PARSE_ERRORS)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index 023489bcfc..b2083af2c4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -54,6 +54,7 @@ source {
         c_timestamp = timestamp
       }
     }
+    ignore_parse_errors = "false"
   }
 
 }

Reply via email to