This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 63c7b4e9cc [Fix][Connector-V2] Fix kafka `format_error_handle_way` not
work (#7838)
63c7b4e9cc is described below
commit 63c7b4e9cc031bdea36ff6f68d2ad4ebde912f53
Author: Jia Fan <[email protected]>
AuthorDate: Thu Oct 17 10:36:14 2024 +0800
[Fix][Connector-V2] Fix kafka `format_error_handle_way` not work (#7838)
---
.../java/org/apache/seatunnel/common/Handover.java | 5 +++-
.../org/apache/seatunnel/common/HandoverTest.java | 34 ++++++----------------
.../seatunnel/kafka/source/KafkaRecordEmitter.java | 6 ++--
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 6 ++--
...ce_format_error_handle_way_fail_to_console.conf | 1 +
...ce_format_error_handle_way_skip_to_console.conf | 1 +
.../reader/SeaTunnelInputPartitionReader.java | 6 +++-
.../reader/batch/ParallelBatchPartitionReader.java | 2 +-
.../batch/ParallelBatchPartitionReader.java | 2 +-
.../batch/SeaTunnelBatchPartitionReader.java | 6 +++-
.../micro/SeaTunnelMicroBatchPartitionReader.java | 6 +++-
11 files changed, 38 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
index 1686514a15..3132d93a16 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java
@@ -30,7 +30,10 @@ public final class Handover<T> implements Closeable {
new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
private Throwable error;
- public boolean isEmpty() {
+ public boolean isEmpty() throws Exception {
+ if (error != null) {
+ rethrowException(error, error.getMessage());
+ }
return blockingQueue.isEmpty();
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
similarity index 50%
copy from
seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
copy to
seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
index e9c9268a46..199a2d4e72 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java
@@ -15,33 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.translation.spark.source.partition.batch;
+package org.apache.seatunnel.common;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.read.PartitionReader;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-import java.io.IOException;
+public class HandoverTest {
-public class SeaTunnelBatchPartitionReader implements
PartitionReader<InternalRow> {
-
- private final ParallelBatchPartitionReader partitionReader;
-
- public SeaTunnelBatchPartitionReader(ParallelBatchPartitionReader
partitionReader) {
- this.partitionReader = partitionReader;
- }
-
- @Override
- public boolean next() throws IOException {
- return partitionReader.next();
- }
-
- @Override
- public InternalRow get() {
- return partitionReader.get();
- }
-
- @Override
- public void close() throws IOException {
- partitionReader.close();
+ @Test
+ public void testThrowExceptionWhenQueueIsEmtpy() {
+ Handover<Object> handover = new Handover<>();
+ handover.reportError(new RuntimeException("test"));
+ Assertions.assertThrows(RuntimeException.class, handover::isEmpty);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
index 6593137aff..87d2b7b7c9 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
public class KafkaRecordEmitter
@@ -71,13 +70,14 @@ public class KafkaRecordEmitter
// consumerRecord.offset + 1 is the offset commit to Kafka and
also the start offset
// for the next run
splitState.setCurrentOffset(consumerRecord.offset() + 1);
- } catch (IOException e) {
+ } catch (Exception e) {
if (this.messageFormatErrorHandleWay ==
MessageFormatErrorHandleWay.SKIP) {
logger.warn(
"Deserialize message failed, skip this message,
message: {}",
new String(consumerRecord.value()));
+ } else {
+ throw e;
}
- throw e;
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index ffc97f4dd3..4a57cbdbd3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -291,7 +291,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER,
null);
- generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ generateTestData(serializer::serializeRow, 0, 100);
Container.ExecResult execResult =
container.executeJob(
"/kafka/kafkasource_format_error_handle_way_skip_to_console.conf");
@@ -308,11 +308,11 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER,
null);
- generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ generateTestData(serializer::serializeRow, 0, 100);
Container.ExecResult execResult =
container.executeJob(
"/kafka/kafkasource_format_error_handle_way_fail_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ Assertions.assertEquals(1, execResult.getExitCode(),
execResult.getStderr());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
index d2a0f05354..dd1390d167 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf
@@ -37,6 +37,7 @@ source {
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = fail
+ format = text
schema = {
fields {
id = bigint
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
index 88b6098b5e..a34856d31b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf
@@ -37,6 +37,7 @@ source {
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = skip
+ format = text
schema = {
fields {
id = bigint
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
index 26e554b7fd..4b651ec053 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java
@@ -34,7 +34,11 @@ public class SeaTunnelInputPartitionReader implements
InputPartitionReader<Inter
@Override
public boolean next() throws IOException {
- return partitionReader.next();
+ try {
+ return partitionReader.next();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
index 2024063870..e20dca09d5 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java
@@ -84,7 +84,7 @@ public class ParallelBatchPartitionReader {
return String.format("parallel-split-enumerator-executor-%s",
subtaskId);
}
- public boolean next() throws IOException {
+ public boolean next() throws Exception {
prepare();
while (running && handover.isEmpty()) {
try {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
index 27ab9f42d2..5ca32d6775 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java
@@ -84,7 +84,7 @@ public class ParallelBatchPartitionReader {
return String.format("parallel-split-enumerator-executor-%s",
subtaskId);
}
- public boolean next() throws IOException {
+ public boolean next() throws Exception {
prepare();
while (running && handover.isEmpty()) {
try {
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
index e9c9268a46..9841a0dfbd 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java
@@ -32,7 +32,11 @@ public class SeaTunnelBatchPartitionReader implements
PartitionReader<InternalRo
@Override
public boolean next() throws IOException {
- return partitionReader.next();
+ try {
+ return partitionReader.next();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
index 61d466d946..597be91d67 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java
@@ -34,7 +34,11 @@ public class SeaTunnelMicroBatchPartitionReader implements
PartitionReader<Inter
@Override
public boolean next() throws IOException {
- return partitionReader.next();
+ try {
+ return partitionReader.next();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override