This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 63c7b4e9cc [Fix][Connector-V2] Fix kafka `format_error_handle_way` not 
work (#7838)
63c7b4e9cc is described below

commit 63c7b4e9cc031bdea36ff6f68d2ad4ebde912f53
Author: Jia Fan <[email protected]>
AuthorDate: Thu Oct 17 10:36:14 2024 +0800

    [Fix][Connector-V2] Fix kafka `format_error_handle_way` not work (#7838)
---
 .../java/org/apache/seatunnel/common/Handover.java |  5 +++-
 .../org/apache/seatunnel/common/HandoverTest.java  | 34 ++++++----------------
 .../seatunnel/kafka/source/KafkaRecordEmitter.java |  6 ++--
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  6 ++--
 ...ce_format_error_handle_way_fail_to_console.conf |  1 +
 ...ce_format_error_handle_way_skip_to_console.conf |  1 +
 .../reader/SeaTunnelInputPartitionReader.java      |  6 +++-
 .../reader/batch/ParallelBatchPartitionReader.java |  2 +-
 .../batch/ParallelBatchPartitionReader.java        |  2 +-
 .../batch/SeaTunnelBatchPartitionReader.java       |  6 +++-
 .../micro/SeaTunnelMicroBatchPartitionReader.java  |  6 +++-
 11 files changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
index 1686514a15..3132d93a16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
@@ -30,7 +30,10 @@ public final class Handover<T> implements Closeable {
             new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
     private Throwable error;
 
-    public boolean isEmpty() {
+    public boolean isEmpty() throws Exception {
+        if (error != null) {
+            rethrowException(error, error.getMessage());
+        }
         return blockingQueue.isEmpty();
     }
 
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
 b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
similarity index 50%
copy from 
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
copy to 
seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
index e9c9268a46..199a2d4e72 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
@@ -15,33 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.translation.spark.source.partition.batch;
+package org.apache.seatunnel.common;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.read.PartitionReader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-import java.io.IOException;
+public class HandoverTest {
 
-public class SeaTunnelBatchPartitionReader implements 
PartitionReader<InternalRow> {
-
-    private final ParallelBatchPartitionReader partitionReader;
-
-    public SeaTunnelBatchPartitionReader(ParallelBatchPartitionReader 
partitionReader) {
-        this.partitionReader = partitionReader;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        return partitionReader.next();
-    }
-
-    @Override
-    public InternalRow get() {
-        return partitionReader.get();
-    }
-
-    @Override
-    public void close() throws IOException {
-        partitionReader.close();
+    @Test
+    public void testThrowExceptionWhenQueueIsEmtpy() {
+        Handover<Object> handover = new Handover<>();
+        handover.reportError(new RuntimeException("test"));
+        Assertions.assertThrows(RuntimeException.class, handover::isEmpty);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
index 6593137aff..87d2b7b7c9 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
 
 public class KafkaRecordEmitter
@@ -71,13 +70,14 @@ public class KafkaRecordEmitter
             // consumerRecord.offset + 1 is the offset commit to Kafka and 
also the start offset
             // for the next run
             splitState.setCurrentOffset(consumerRecord.offset() + 1);
-        } catch (IOException e) {
+        } catch (Exception e) {
             if (this.messageFormatErrorHandleWay == 
MessageFormatErrorHandleWay.SKIP) {
                 logger.warn(
                         "Deserialize message failed, skip this message, 
message: {}",
                         new String(consumerRecord.value()));
+            } else {
+                throw e;
             }
-            throw e;
         }
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index ffc97f4dd3..4a57cbdbd3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -291,7 +291,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FORMAT,
                         DEFAULT_FIELD_DELIMITER,
                         null);
-        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        generateTestData(serializer::serializeRow, 0, 100);
         Container.ExecResult execResult =
                 container.executeJob(
                         
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
@@ -308,11 +308,11 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FORMAT,
                         DEFAULT_FIELD_DELIMITER,
                         null);
-        generateTestData(row -> serializer.serializeRow(row), 0, 100);
+        generateTestData(serializer::serializeRow, 0, 100);
         Container.ExecResult execResult =
                 container.executeJob(
                         
"/kafka/kafkasource_format_error_handle_way_fail_to_console.conf");
-        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        Assertions.assertEquals(1, execResult.getExitCode(), 
execResult.getStderr());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index d2a0f05354..dd1390d167 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -37,6 +37,7 @@ source {
     result_table_name = "kafka_table"
     start_mode = "earliest"
     format_error_handle_way = fail
+    format = text
     schema = {
       fields {
         id = bigint
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 88b6098b5e..a34856d31b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -37,6 +37,7 @@ source {
     result_table_name = "kafka_table"
     start_mode = "earliest"
     format_error_handle_way = skip
+    format = text
     schema = {
       fields {
         id = bigint
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
index 26e554b7fd..4b651ec053 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
@@ -34,7 +34,11 @@ public class SeaTunnelInputPartitionReader implements 
InputPartitionReader<Inter
 
     @Override
     public boolean next() throws IOException {
-        return partitionReader.next();
+        try {
+            return partitionReader.next();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
index 2024063870..e20dca09d5 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
@@ -84,7 +84,7 @@ public class ParallelBatchPartitionReader {
         return String.format("parallel-split-enumerator-executor-%s", 
subtaskId);
     }
 
-    public boolean next() throws IOException {
+    public boolean next() throws Exception {
         prepare();
         while (running && handover.isEmpty()) {
             try {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
index 27ab9f42d2..5ca32d6775 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
@@ -84,7 +84,7 @@ public class ParallelBatchPartitionReader {
         return String.format("parallel-split-enumerator-executor-%s", 
subtaskId);
     }
 
-    public boolean next() throws IOException {
+    public boolean next() throws Exception {
         prepare();
         while (running && handover.isEmpty()) {
             try {
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
index e9c9268a46..9841a0dfbd 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
@@ -32,7 +32,11 @@ public class SeaTunnelBatchPartitionReader implements 
PartitionReader<InternalRo
 
     @Override
     public boolean next() throws IOException {
-        return partitionReader.next();
+        try {
+            return partitionReader.next();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
index 61d466d946..597be91d67 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
@@ -34,7 +34,11 @@ public class SeaTunnelMicroBatchPartitionReader implements 
PartitionReader<Inter
 
     @Override
     public boolean next() throws IOException {
-        return partitionReader.next();
+        try {
+            return partitionReader.next();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override

Reply via email to