This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e64a42838 [Fix][File]use common-csv to read csv file (#8919)
3e64a42838 is described below
commit 3e64a4283874aea263d2f9344715096d44fea617
Author: litiliu <[email protected]>
AuthorDate: Wed Mar 19 21:23:05 2025 +0800
[Fix][File]use common-csv to read csv file (#8919)
Co-authored-by: litiliu <[email protected]>
---
.../file/source/reader/CsvReadStrategy.java | 101 +++++++++++----------
.../e2e/connector/file/local/LocalFileIT.java | 6 ++
.../src/test/resources/csv/break_line.csv | 3 +
.../resources/csv/breakline_csv_to_assert.conf | 51 +++++++++++
.../format/csv/CsvDeserializationSchema.java | 15 +--
5 files changed, 124 insertions(+), 52 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 5fabc89110..cc5cb8820e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -39,6 +38,10 @@ import
org.apache.seatunnel.format.csv.constant.CsvFormatConstant;
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
import io.airlift.compress.lzo.LzopCodec;
import lombok.extern.slf4j.Slf4j;
@@ -47,12 +50,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {
- private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private CsvDeserializationSchema deserializationSchema;
private String fieldDelimiter =
BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();
private DateUtils.Formatter dateFormat =
BaseSourceConfigOptions.DATE_FORMAT.defaultValue();
private DateTimeUtils.Formatter datetimeFormat =
@@ -62,6 +66,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
private CsvLineProcessor processor;
private int[] indexes;
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
+ private CatalogTable inputCatalogTable;
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
@@ -96,51 +101,54 @@ public class CsvReadStrategy extends AbstractReadStrategy {
break;
}
+ CSVFormat csvFormat = CSVFormat.DEFAULT;
try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(actualInputStream,
encoding))) {
- reader.lines()
- .skip(skipHeaderNumber)
- .forEach(
- line -> {
- try {
- SeaTunnelRow seaTunnelRow =
- deserializationSchema.deserialize(
-
line.getBytes(StandardCharsets.UTF_8));
- if (!readColumns.isEmpty()) {
- // need column projection
- Object[] fields;
- if (isMergePartition) {
- fields =
- new Object
- [readColumns.size()
- +
partitionsMap.size()];
- } else {
- fields = new
Object[readColumns.size()];
- }
- for (int i = 0; i < indexes.length;
i++) {
- fields[i] =
seaTunnelRow.getField(indexes[i]);
- }
- seaTunnelRow = new
SeaTunnelRow(fields);
- }
- if (isMergePartition) {
- int index =
seaTunnelRowType.getTotalFields();
- for (String value :
partitionsMap.values()) {
- seaTunnelRow.setField(index++,
value);
- }
- }
- seaTunnelRow.setTableId(tableId);
- output.collect(seaTunnelRow);
- } catch (IOException e) {
- String errorMsg =
- String.format(
- "Deserialize this data
[%s] failed, please check the origin data",
- line);
- throw new FileConnectorException(
-
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
- errorMsg,
- e);
- }
- });
+ new BufferedReader(new
InputStreamReader(actualInputStream, encoding));
+ CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
+ for (int i = 0; i < skipHeaderNumber; i++) {
+ if (reader.readLine() == null) {
+ throw new IOException(
+ String.format(
+ "File [%s] has fewer lines than expected
to skip.",
+ currentFileName));
+ }
+ }
+ // read lines
+ for (CSVRecord csvRecord : csvParser) {
+ HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
+ for (int i = 0; i <
inputCatalogTable.getTableSchema().getColumns().size(); i++) {
+ fieldIdValueMap.put(i, csvRecord.get(i));
+ }
+ SeaTunnelRow seaTunnelRow =
deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
+ if (!readColumns.isEmpty()) {
+ // need column projection
+ Object[] fields;
+ if (isMergePartition) {
+ fields = new Object[readColumns.size() +
partitionsMap.size()];
+ } else {
+ fields = new Object[readColumns.size()];
+ }
+ for (int i = 0; i < indexes.length; i++) {
+ fields[i] = seaTunnelRow.getField(indexes[i]);
+ }
+ seaTunnelRow = new SeaTunnelRow(fields);
+ }
+ if (isMergePartition) {
+ int index = seaTunnelRowType.getTotalFields();
+ for (String value : partitionsMap.values()) {
+ seaTunnelRow.setField(index++, value);
+ }
+ }
+ seaTunnelRow.setTableId(tableId);
+ output.collect(seaTunnelRow);
+ }
+ } catch (IOException e) {
+ String errorMsg =
+ String.format(
+ "Deserialize this file [%s] failed, please check
the origin data",
+ currentFileName);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg,
e);
}
}
@@ -177,6 +185,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
@Override
public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ this.inputCatalogTable = catalogTable;
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), rowType);
ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index c31e2e8560..fc69268584 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -286,6 +286,11 @@ public class LocalFileIT extends TestSuiteBase {
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
container);
+ ContainerUtil.copyFileIntoContainers(
+ "/csv/break_line.csv",
+ "/seatunnel/read/csv/break_line/break_line.csv",
+ container);
+
ContainerUtil.copyFileIntoContainers(
"/text/e2e_null_format.txt",
"/seatunnel/read/e2e_null_format/e2e_null_format.txt",
@@ -300,6 +305,7 @@ public class LocalFileIT extends TestSuiteBase {
TestHelper helper = new TestHelper(container);
helper.execute("/csv/fake_to_local_csv.conf");
helper.execute("/csv/local_csv_to_assert.conf");
+ helper.execute("/csv/breakline_csv_to_assert.conf");
helper.execute("/excel/fake_to_local_excel.conf");
helper.execute("/excel/local_excel_to_assert.conf");
helper.execute("/excel/local_excel_projection_to_assert.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
new file mode 100644
index 0000000000..f534c7811c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
@@ -0,0 +1,3 @@
+20,"harry
+ potter"
+21,"tom"
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
new file mode 100644
index 0000000000..7a0c3caf65
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/csv/break_line"
+ file_format_type = csv
+ schema = {
+ fields {
+ age = int
+ name = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
index 551bba12b0..df4dc05cd6 100644
---
a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.format.csv;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
@@ -41,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.io.IOException;
+import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -58,7 +58,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
-public class CsvDeserializationSchema implements
DeserializationSchema<SeaTunnelRow> {
+public class CsvDeserializationSchema implements Serializable {
private final SeaTunnelRowType seaTunnelRowType;
private final String[] separators;
private final String encoding;
@@ -169,13 +169,17 @@ public class CsvDeserializationSchema implements
DeserializationSchema<SeaTunnel
}
}
- @Override
- public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ protected SeaTunnelRow deserialize(byte[] message) throws IOException {
if (message == null || message.length == 0) {
return null;
}
String content = new String(message,
EncodingUtils.tryParseCharset(encoding));
Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content,
seaTunnelRowType, 0);
+ SeaTunnelRow seaTunnelRow = getSeaTunnelRow(splitsMap);
+ return seaTunnelRow;
+ }
+
+ public SeaTunnelRow getSeaTunnelRow(Map<Integer, String> splitsMap) {
Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < objects.length; i++) {
String fieldValue = splitsMap.get(i);
@@ -201,12 +205,11 @@ public class CsvDeserializationSchema implements
DeserializationSchema<SeaTunnel
return seaTunnelRow;
}
- @Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return seaTunnelRowType;
}
- public Map<Integer, String> splitLineBySeaTunnelRowType(
+ protected Map<Integer, String> splitLineBySeaTunnelRowType(
String line, SeaTunnelRowType seaTunnelRowType, int level) {
String[] splits = processor.splitLine(line, separators[level]);
LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();