This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1ed22b15 [Feature][Connector-V2][Kafka] Kafka source supports data 
deserialization failure skipping (#4364)
e1ed22b15 is described below

commit e1ed22b1531eb11bbba9898f55d0568d386624d3
Author: dylandai <[email protected]>
AuthorDate: Fri Mar 24 20:53:42 2023 +0800

    [Feature][Connector-V2][Kafka] Kafka source supports data deserialization 
failure skipping (#4364)
    
    * [Feature][Connector-V2][Kafka]Kafka source supports data deserialization 
failure skipping #4361
    
    [Feature][Connector-V2][Kafka]Kafka source supports data deserialization 
failure skipping #4361
    
    * change log level and add changelog
    
    1. change log level
    2. add kafka source changeLog
    
    * add changelog
    
    add changelog
    
    * add e2e
    
    add e2e
    
    * add e2e case
    
    * [Feature][Connector-V2][Kafka] Fix code style
    
    * Update 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
    
    Co-authored-by: Tyrantlucifer <[email protected]>
    
    * fix code-review
    
    * add e2e case for format_error_handle_way
    
    1、format_error_handle_way = fail
    The data is invalid, an exception will be thrown
    2、skip
    The data is invalid and will be skipped
    
    * unify exception
    
     unify exception
    
    * change e2e config
    
    * fix e2e test case
    
    * Update docs/en/connector-v2/source/kafka.md
    
    Co-authored-by: Eric <[email protected]>
    
    * Update kafka.md
    
    fix code-style
    
    ---------
    
    Co-authored-by: tyrantlucifer <[email protected]>
    Co-authored-by: Tyrantlucifer <[email protected]>
    Co-authored-by: Eric <[email protected]>
---
 docs/en/connector-v2/source/kafka.md               |  8 ++++++
 .../api/serialization/DeserializationSchema.java   | 10 +++++--
 .../connectors/seatunnel/kafka/config/Config.java  |  9 ++++++
 .../kafka/config/MessageFormatErrorHandleWay.java  | 23 ++++++++++++++++
 .../seatunnel/kafka/source/KafkaSource.java        | 20 +++++++++++++-
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 22 +++++++++++++--
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 32 ++++++++++++++++++++++
 ...e_format_error_handle_way_fail_to_console.conf} |  6 ++--
 ...e_format_error_handle_way_skip_to_console.conf} |  6 ++--
 .../resources/kafkasource_json_to_console.conf     |  1 +
 .../resources/kafkasource_text_to_console.conf     |  1 +
 11 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 75d910341..9506712c2 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -28,6 +28,7 @@ Source connector for Apache Kafka.
 | common-options                      | config  | no       | -                 
       |
 | schema                              |         | no       | -                 
       |
 | format                              | String  | no       | json              
       |
+| format_error_handle_way             | String  | no       | fail              
       |
 | field_delimiter                     | String  | no       | ,                 
       |
 | start_mode                          | String  | no       | group_offsets     
       |
 | start_mode.offsets                  |         | no       |                   
       |
@@ -75,6 +76,12 @@ The structure of the data, including field names and field 
types.
 Data format. The default format is json. Optional text format. The default 
field separator is ", ".
 If you customize the delimiter, add the "field_delimiter" option.
 
+## format_error_handle_way
+
+The processing method of data format error. The default value is fail, and the 
optional value is (fail, skip).
+When fail is selected, data format error will block and an exception will be 
thrown.
+When skip is selected, data format error will skip this line data.
+
 ## field_delimiter
 
 Customize the field delimiter for data format.
@@ -218,4 +225,5 @@ source {
 - [Improve] Support for dynamic discover topic & partition in streaming mode 
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
 - [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
 - [Bug] Fixed the problem that parsing the offset format failed when the 
startup mode was 
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
+- [Feature] Kafka source supports data deserialization failure 
skipping([4364](https://github.com/apache/incubator-seatunnel/pull/4364))
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
index d7f7abb1a..745e517a2 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/DeserializationSchema.java
@@ -35,9 +35,13 @@ public interface DeserializationSchema<T> extends 
Serializable {
     T deserialize(byte[] message) throws IOException;
 
     default void deserialize(byte[] message, Collector<T> out) throws 
IOException {
-        T deserialize = deserialize(message);
-        if (deserialize != null) {
-            out.collect(deserialize);
+        try {
+            T deserialize = deserialize(message);
+            if (deserialize != null) {
+                out.collect(deserialize);
+            }
+        } catch (IOException e) {
+            throw new IOException(e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 60ce08038..ff051b96d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -155,4 +155,13 @@ public class Config {
                     .defaultValue(-1L)
                     .withDescription(
                             "The interval for dynamically discovering topics 
and partitions.");
+
+    public static final Option<MessageFormatErrorHandleWay> 
MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION =
+            Options.key("format_error_handle_way")
+                    .enumType(MessageFormatErrorHandleWay.class)
+                    .defaultValue(MessageFormatErrorHandleWay.FAIL)
+                    .withDescription(
+                            "The processing method of data format error. The 
default value is fail, and the optional value is (fail, skip). "
+                                    + "When fail is selected, data format 
error will block and an exception will be thrown. "
+                                    + "When skip is selected, data format 
error will skip this line data.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java
new file mode 100644
index 000000000..bd61481f7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormatErrorHandleWay.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum MessageFormatErrorHandleWay {
+    FAIL,
+    SKIP,
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index e760dbe6e..741d75216 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -41,6 +41,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -66,6 +67,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIEL
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
@@ -83,6 +85,8 @@ public class KafkaSource
     private SeaTunnelRowType typeInfo;
     private JobContext jobContext;
     private long discoveryIntervalMillis = 
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
+    private MessageFormatErrorHandleWay messageFormatErrorHandleWay =
+            MessageFormatErrorHandleWay.FAIL;
 
     @Override
     public Boundedness getBoundedness() {
@@ -186,6 +190,19 @@ public class KafkaSource
                                     this.metadata.getProperties().put(key, 
value.unwrapped()));
         }
 
+        if (config.hasPath(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
+            MessageFormatErrorHandleWay formatErrorWayOption =
+                    
ReadonlyConfig.fromConfig(config).get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
+            switch (formatErrorWayOption) {
+                case FAIL:
+                case SKIP:
+                    this.messageFormatErrorHandleWay = formatErrorWayOption;
+                    break;
+                default:
+                    break;
+            }
+        }
+
         setDeserialization(config);
     }
 
@@ -197,7 +214,8 @@ public class KafkaSource
     @Override
     public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.metadata, deserializationSchema, 
readerContext);
+        return new KafkaSourceReader(
+                this.metadata, deserializationSchema, readerContext, 
messageFormatErrorHandleWay);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 3e602aa7f..07fe71a60 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 
@@ -62,6 +63,7 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
     private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
     private final ExecutorService executorService;
     private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private final MessageFormatErrorHandleWay messageFormatErrorHandleWay;
 
     private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
 
@@ -70,9 +72,11 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
     KafkaSourceReader(
             ConsumerMetadata metadata,
             DeserializationSchema<SeaTunnelRow> deserializationSchema,
-            SourceReader.Context context) {
+            Context context,
+            MessageFormatErrorHandleWay messageFormatErrorHandleWay) {
         this.metadata = metadata;
         this.context = context;
+        this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
         this.sourceSplits = new HashSet<>();
         this.deserializationSchema = deserializationSchema;
         this.consumerThreadMap = new ConcurrentHashMap<>();
@@ -145,8 +149,20 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                                                     for 
(ConsumerRecord<byte[], byte[]> record :
                                                             recordList) {
 
-                                                        
deserializationSchema.deserialize(
-                                                                
record.value(), output);
+                                                        try {
+                                                            
deserializationSchema.deserialize(
+                                                                    
record.value(), output);
+                                                        } catch (IOException 
e) {
+                                                            if 
(this.messageFormatErrorHandleWay
+                                                                    == 
MessageFormatErrorHandleWay
+                                                                            
.SKIP) {
+                                                                log.warn(
+                                                                        
"Deserialize message failed, skip this message, message: {}",
+                                                                        
record.value());
+                                                                continue;
+                                                            }
+                                                            throw e;
+                                                        }
 
                                                         if 
(Boundedness.BOUNDED.equals(
                                                                         
context.getBoundedness())
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index f9fb72711..dee8cb8c4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -205,6 +205,38 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    public void 
testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_error_message",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void 
testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                DefaultSeaTunnelRowSerializer.create(
+                        "test_topic_error_message",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/kafka/kafkasource_format_error_handle_way_fail_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testSourceKafka(TestContainer container) throws IOException, 
InterruptedException {
         testKafkaLatestToConsole(container);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
similarity index 94%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index 129149623..19c1331f7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -33,9 +33,11 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_json"
+    topic = "test_topic_error_message"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
+    format_error_handle_way = fail
+#     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
            id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
similarity index 94%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 129149623..012f93ed3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -33,9 +33,11 @@ env {
 source {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
-    topic = "test_topic_json"
+    topic = "test_topic_error_message"
     result_table_name = "kafka_table"
-    kafka.auto.offset.reset = "earliest"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+#     kafka.auto.offset.reset = "earliest"
     schema = {
       fields {
            id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
index 129149623..ace91e2d3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
@@ -36,6 +36,7 @@ source {
     topic = "test_topic_json"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
+    format_error_handle_way = skip
     schema = {
       fields {
            id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
index 67879791b..af6db138b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
@@ -36,6 +36,7 @@ source {
     topic = "test_topic_text"
     result_table_name = "kafka_table"
     kafka.auto.offset.reset = "earliest"
+    format_error_handle_way = fail
     schema = {
       fields {
            id = bigint

Reply via email to