This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 17e29185fa [Bug] [formats] Fix fail to parse line when content
contains the file delimiter (#6589)
17e29185fa is described below
commit 17e29185fa6256d541333c1dd05c0b8ac94a73e3
Author: litiliu <[email protected]>
AuthorDate: Mon Apr 8 20:53:07 2024 +0800
[Bug] [formats] Fix fail to parse line when content contains the file
delimiter (#6589)
---
.../file/source/reader/TextReadStrategy.java | 19 ++-
.../format/text/TextDeserializationSchema.java | 18 ++-
.../format/text/splitor/CsvLineSplitor.java | 73 ++++++++++
.../text/splitor/DefaultTextLineSplitor.java | 28 ++++
.../format/text/splitor/TextLineSplitor.java | 22 +++
.../format/text/CsvTextFormatSchemaTest.java | 153 +++++++++++++++++++++
6 files changed, 308 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 0b8d1b7aa3..fa33f45ad6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -35,6 +35,9 @@ import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErr
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
+import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
+import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
import io.airlift.compress.lzo.LzopCodec;
import lombok.extern.slf4j.Slf4j;
@@ -56,6 +59,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();
private TimeUtils.Formatter timeFormat =
BaseSourceConfigOptions.TIME_FORMAT.defaultValue();
private CompressFormat compressFormat =
BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();
+ private TextLineSplitor textLineSplitor;
private int[] indexes;
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
@@ -145,7 +149,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
.delimiter(TextFormatConstant.PLACEHOLDER)
.dateFormatter(dateFormat)
.dateTimeFormatter(datetimeFormat)
- .timeFormatter(timeFormat);
+ .timeFormatter(timeFormat)
+ .textLineSplitor(textLineSplitor);
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
@@ -184,7 +189,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
.delimiter(fieldDelimiter)
.dateFormatter(dateFormat)
.dateTimeFormatter(datetimeFormat)
- .timeFormatter(timeFormat);
+ .timeFormatter(timeFormat)
+ .textLineSplitor(textLineSplitor);
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
@@ -232,5 +238,14 @@ public class TextReadStrategy extends AbstractReadStrategy
{
pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());
compressFormat =
CompressFormat.valueOf(compressCodec.toUpperCase());
}
+ if (FileFormat.CSV.equals(
+ FileFormat.valueOf(
+ pluginConfig
+
.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key())
+ .toUpperCase()))) {
+ textLineSplitor = new CsvLineSplitor();
+ } else {
+ textLineSplitor = new DefaultTextLineSplitor();
+ }
}
}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index d7515da84a..c4ba37996a 100644
---
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -31,6 +31,8 @@ import org.apache.seatunnel.common.utils.EncodingUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
+import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
+import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
import org.apache.commons.lang3.StringUtils;
@@ -50,6 +52,7 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
private final DateTimeUtils.Formatter dateTimeFormatter;
private final TimeUtils.Formatter timeFormatter;
private final String encoding;
+ private final TextLineSplitor splitor;
private TextDeserializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
@@ -57,13 +60,15 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
DateUtils.Formatter dateFormatter,
DateTimeUtils.Formatter dateTimeFormatter,
TimeUtils.Formatter timeFormatter,
- String encoding) {
+ String encoding,
+ TextLineSplitor splitor) {
this.seaTunnelRowType = seaTunnelRowType;
this.separators = separators;
this.dateFormatter = dateFormatter;
this.dateTimeFormatter = dateTimeFormatter;
this.timeFormatter = timeFormatter;
this.encoding = encoding;
+ this.splitor = splitor;
}
public static Builder builder() {
@@ -78,6 +83,7 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
private TimeUtils.Formatter timeFormatter =
TimeUtils.Formatter.HH_MM_SS;
private String encoding = StandardCharsets.UTF_8.name();
+ private TextLineSplitor textLineSplitor = new DefaultTextLineSplitor();
private Builder() {}
@@ -116,6 +122,11 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
return this;
}
+ public Builder textLineSplitor(TextLineSplitor splitor) {
+ this.textLineSplitor = splitor;
+ return this;
+ }
+
public TextDeserializationSchema build() {
return new TextDeserializationSchema(
seaTunnelRowType,
@@ -123,7 +134,8 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
dateFormatter,
dateTimeFormatter,
timeFormatter,
- encoding);
+ encoding,
+ textLineSplitor);
}
}
@@ -145,7 +157,7 @@ public class TextDeserializationSchema implements
DeserializationSchema<SeaTunne
private Map<Integer, String> splitLineBySeaTunnelRowType(
String line, SeaTunnelRowType seaTunnelRowType, int level) {
- String[] splits = line.split(separators[level], -1);
+ String[] splits = splitor.spliteLine(line, separators[level]);
LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < splits.length; i++) {
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
new file mode 100644
index 0000000000..aff4d51827
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Slf4j
+public class CsvLineSplitor implements TextLineSplitor, Serializable {
+ private Map<Character, CSVFormat> splitorFormatMap = new HashMap<>();
+
+ @Override
+ public String[] spliteLine(String line, String splitor) {
+ Character splitChar = splitor.charAt(0);
+ if (Objects.isNull(splitorFormatMap.get(splitChar))) {
+ splitorFormatMap.put(splitChar,
CSVFormat.DEFAULT.withDelimiter(splitChar));
+ }
+ CSVFormat format = splitorFormatMap.get(splitChar);
+ CSVParser parser = null;
+ // Method to parse the line into CSV with the given separator
+ try {
+ // Create CSV parser
+ parser = CSVParser.parse(line, format);
+ // Parse the CSV records
+ List<String> res = new ArrayList<>();
+ for (CSVRecord record : parser.getRecords()) {
+ for (String value : record) {
+ res.add(value);
+ }
+ }
+ return res.toArray(new String[0]);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ return new String[0];
+ } finally {
+ if (Objects.nonNull(parser)) {
+ try {
+ parser.close();
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
+ }
+ }
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
new file mode 100644
index 0000000000..9c149eae3d
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+import java.io.Serializable;
+
+public class DefaultTextLineSplitor implements TextLineSplitor, Serializable {
+
+ @Override
+ public String[] spliteLine(String line, String seperator) {
+ return line.split(seperator, -1);
+ }
+}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
new file mode 100644
index 0000000000..ad0c998ac1
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+public interface TextLineSplitor {
+ String[] spliteLine(String line, String splitor);
+}
diff --git
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
new file mode 100644
index 0000000000..0f58e32f14
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Map;
+
+public class CsvTextFormatSchemaTest {
+ public String content =
+ "\"mess,age\","
+ + "true,"
+ + "1,"
+ + "2,"
+ + "3,"
+ + "4,"
+ + "6.66,"
+ + "7.77,"
+ + "8.8888888,"
+ + ','
+ + "2022-09-24,"
+ + "22:45:00,"
+ + "2022-09-24 22:45:00,"
+ // row field
+ + String.join("\u0003", Arrays.asList("1", "2", "3", "4",
"5", "6"))
+ + '\002'
+ + "tyrantlucifer\00418\003Kris\00421"
+ + ','
+ // array field
+ + String.join("\u0002", Arrays.asList("1", "2", "3", "4",
"5", "6"))
+ + ','
+ // map field
+ + "tyrantlucifer"
+ + '\003'
+ + "18"
+ + '\002'
+ + "Kris"
+ + '\003'
+ + "21"
+ + '\002'
+ + "nullValueKey"
+ + '\003'
+ + '\002'
+ + '\003'
+ + "1231";
+
+ public SeaTunnelRowType seaTunnelRowType;
+
+ @BeforeEach
+ public void initSeaTunnelRowType() {
+ seaTunnelRowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "string_field",
+ "boolean_field",
+ "tinyint_field",
+ "smallint_field",
+ "int_field",
+ "bigint_field",
+ "float_field",
+ "double_field",
+ "decimal_field",
+ "null_field",
+ "date_field",
+ "time_field",
+ "timestamp_field",
+ "row_field",
+ "array_field",
+ "map_field"
+ },
+ new SeaTunnelDataType<?>[] {
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(30, 8),
+ BasicType.VOID_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ new SeaTunnelRowType(
+ new String[] {
+ "array_field", "map_field",
+ },
+ new SeaTunnelDataType<?>[] {
+ ArrayType.INT_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE),
+ }),
+ ArrayType.INT_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)
+ });
+ }
+
+ @Test
+ public void testParse() throws IOException {
+ String delimiter = ",";
+ TextDeserializationSchema deserializationSchema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(delimiter)
+ .textLineSplitor(new CsvLineSplitor())
+ .build();
+ SeaTunnelRow seaTunnelRow =
deserializationSchema.deserialize(content.getBytes());
+ Assertions.assertEquals("mess,age", seaTunnelRow.getField(0));
+ Assertions.assertEquals(Boolean.TRUE, seaTunnelRow.getField(1));
+ Assertions.assertEquals(Byte.valueOf("1"), seaTunnelRow.getField(2));
+ Assertions.assertEquals(Short.valueOf("2"), seaTunnelRow.getField(3));
+ Assertions.assertEquals(Integer.valueOf("3"),
seaTunnelRow.getField(4));
+ Assertions.assertEquals(Long.valueOf("4"), seaTunnelRow.getField(5));
+ Assertions.assertEquals(Float.valueOf("6.66"),
seaTunnelRow.getField(6));
+ Assertions.assertEquals(Double.valueOf("7.77"),
seaTunnelRow.getField(7));
+ Assertions.assertEquals(BigDecimal.valueOf(8.8888888D),
seaTunnelRow.getField(8));
+ Assertions.assertNull((seaTunnelRow.getField(9)));
+ Assertions.assertEquals(LocalDate.of(2022, 9, 24),
seaTunnelRow.getField(10));
+ Assertions.assertEquals(((Map<?, ?>)
(seaTunnelRow.getField(15))).get("tyrantlucifer"), 18);
+ Assertions.assertEquals(((Map<?, ?>)
(seaTunnelRow.getField(15))).get("Kris"), 21);
+ }
+}