This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 701f17b5d4 [Improve][Connector-V2] Add optional flag for rocketmq
connector to skip parse errors instead of failing (#8737)
701f17b5d4 is described below
commit 701f17b5d40c680c5d14a74c6f4c02c04f2ad96b
Author: xiaochen <[email protected]>
AuthorDate: Mon Feb 17 09:55:04 2025 +0800
[Improve][Connector-V2] Add optional flag for rocketmq connector to skip
parse errors instead of failing (#8737)
---
docs/en/connector-v2/source/RocketMQ.md | 1 +
.../seatunnel/connectors/seatunnel/rocketmq/config/Config.java | 2 ++
.../connectors/seatunnel/rocketmq/config/ConsumerConfig.java | 6 ++++++
.../seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java | 3 ++-
.../connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java | 2 +-
.../connectors/seatunnel/rocketmq/source/RocketMqSource.java | 6 +++++-
.../connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java | 5 +++--
.../src/test/resources/rocketmq-source_json_to_console.conf | 1 +
8 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/docs/en/connector-v2/source/RocketMQ.md
b/docs/en/connector-v2/source/RocketMQ.md
index eb8edc1c80..0691afbc72 100644
--- a/docs/en/connector-v2/source/RocketMQ.md
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -44,6 +44,7 @@ Source connector for Apache RocketMQ.
| start.mode.offsets | | no |
|
|
| start.mode.timestamp | Long | no |
| The time required for consumption mode to be
"CONSUME_FROM_TIMESTAMP".
|
| partition.discovery.interval.millis | long | no | -1
| The interval for dynamically discovering topics and partitions.
|
+| ignore_parse_errors | Boolean | no | false
| Optional flag to skip parse errors instead of failing.
|
| common-options | config | no | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
|
### start.mode.offsets
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
index eced972601..c1a7aa8067 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
@@ -26,6 +26,8 @@ public class Config {
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";
+ public static final String CONNECTOR_IDENTITY = "Rocketmq";
+
public static final Option<String> NAME_SRV_ADDR =
Options.key("name.srv.addr")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
index 534b4c8bd4..642278792e 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
@@ -86,4 +86,10 @@ public class ConsumerConfig extends Config {
.longType()
.defaultValue(DEFAULT_POLL_TIMEOUT_MILLIS)
.withDescription("The poll timeout in milliseconds.");
+
+ public static final Option<Boolean> IGNORE_PARSE_ERRORS =
+ Options.key("ignore_parse_errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Optional flag to skip parse errors
instead of failing.");
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index 2bd55150c0..c4e61d24bd 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -43,6 +43,7 @@ import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.CONNECTOR_IDENTITY;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
@@ -57,7 +58,7 @@ public class RocketMqSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public String getPluginName() {
- return "Rocketmq";
+ return CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
index efad35dea2..85d9bd02df 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
@@ -29,7 +29,7 @@ import com.google.auto.service.AutoService;
public class RocketMqSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return "Rocketmq";
+ return Config.CONNECTOR_IDENTITY;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
index 39052e5172..3558d3a5f8 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -260,10 +260,14 @@ public class RocketMqSource
if (config.hasPath(FORMAT.key())) {
format = SchemaFormat.find(config.getString(FORMAT.key()));
}
+ boolean ignoreParseErrors = false;
+ if (config.hasPath(ConsumerConfig.IGNORE_PARSE_ERRORS.key())) {
+ ignoreParseErrors =
config.getBoolean(ConsumerConfig.IGNORE_PARSE_ERRORS.key());
+ }
switch (format) {
case JSON:
deserializationSchema =
- new JsonDeserializationSchema(catalogTable, false,
false);
+ new JsonDeserializationSchema(catalogTable, false,
ignoreParseErrors);
break;
case TEXT:
String delimiter = DEFAULT_FIELD_DELIMITER;
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
index e1f756f0b2..fd04ac37d8 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -32,7 +32,7 @@ public class RocketMqSourceFactory implements
TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "Rocketmq";
+ return Config.CONNECTOR_IDENTITY;
}
@Override
@@ -55,7 +55,8 @@ public class RocketMqSourceFactory implements
TableSourceFactory {
.conditional(
ConsumerConfig.START_MODE,
StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
- ConsumerConfig.START_MODE_OFFSETS)
+ ConsumerConfig.START_MODE_OFFSETS,
+ ConsumerConfig.IGNORE_PARSE_ERRORS)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
index 023489bcfc..b2083af2c4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -54,6 +54,7 @@ source {
c_timestamp = timestamp
}
}
+ ignore_parse_errors = "false"
}
}