This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 17e29185fa [Bug] [formats] Fix fail to parse line when content 
contains the file delimiter (#6589)
17e29185fa is described below

commit 17e29185fa6256d541333c1dd05c0b8ac94a73e3
Author: litiliu <[email protected]>
AuthorDate: Mon Apr 8 20:53:07 2024 +0800

    [Bug] [formats] Fix fail to parse line when content contains the file 
delimiter (#6589)
---
 .../file/source/reader/TextReadStrategy.java       |  19 ++-
 .../format/text/TextDeserializationSchema.java     |  18 ++-
 .../format/text/splitor/CsvLineSplitor.java        |  73 ++++++++++
 .../text/splitor/DefaultTextLineSplitor.java       |  28 ++++
 .../format/text/splitor/TextLineSplitor.java       |  22 +++
 .../format/text/CsvTextFormatSchemaTest.java       | 153 +++++++++++++++++++++
 6 files changed, 308 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 0b8d1b7aa3..fa33f45ad6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -35,6 +35,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErr
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
+import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
+import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
+import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
 
 import io.airlift.compress.lzo.LzopCodec;
 import lombok.extern.slf4j.Slf4j;
@@ -56,6 +59,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
             BaseSourceConfigOptions.DATETIME_FORMAT.defaultValue();
     private TimeUtils.Formatter timeFormat = 
BaseSourceConfigOptions.TIME_FORMAT.defaultValue();
     private CompressFormat compressFormat = 
BaseSourceConfigOptions.COMPRESS_CODEC.defaultValue();
+    private TextLineSplitor textLineSplitor;
     private int[] indexes;
     private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
 
@@ -145,7 +149,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
                         .delimiter(TextFormatConstant.PLACEHOLDER)
                         .dateFormatter(dateFormat)
                         .dateTimeFormatter(datetimeFormat)
-                        .timeFormatter(timeFormat);
+                        .timeFormatter(timeFormat)
+                        .textLineSplitor(textLineSplitor);
         if (isMergePartition) {
             deserializationSchema =
                     
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
@@ -184,7 +189,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
                         .delimiter(fieldDelimiter)
                         .dateFormatter(dateFormat)
                         .dateTimeFormatter(datetimeFormat)
-                        .timeFormatter(timeFormat);
+                        .timeFormatter(timeFormat)
+                        .textLineSplitor(textLineSplitor);
         if (isMergePartition) {
             deserializationSchema =
                     
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
@@ -232,5 +238,14 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
                     
pluginConfig.getString(BaseSourceConfigOptions.COMPRESS_CODEC.key());
             compressFormat = 
CompressFormat.valueOf(compressCodec.toUpperCase());
         }
+        if (FileFormat.CSV.equals(
+                FileFormat.valueOf(
+                        pluginConfig
+                                
.getString(BaseSourceConfigOptions.FILE_FORMAT_TYPE.key())
+                                .toUpperCase()))) {
+            textLineSplitor = new CsvLineSplitor();
+        } else {
+            textLineSplitor = new DefaultTextLineSplitor();
+        }
     }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index d7515da84a..c4ba37996a 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -31,6 +31,8 @@ import org.apache.seatunnel.common.utils.EncodingUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
+import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
+import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -50,6 +52,7 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
     private final DateTimeUtils.Formatter dateTimeFormatter;
     private final TimeUtils.Formatter timeFormatter;
     private final String encoding;
+    private final TextLineSplitor splitor;
 
     private TextDeserializationSchema(
             @NonNull SeaTunnelRowType seaTunnelRowType,
@@ -57,13 +60,15 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
             DateUtils.Formatter dateFormatter,
             DateTimeUtils.Formatter dateTimeFormatter,
             TimeUtils.Formatter timeFormatter,
-            String encoding) {
+            String encoding,
+            TextLineSplitor splitor) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.separators = separators;
         this.dateFormatter = dateFormatter;
         this.dateTimeFormatter = dateTimeFormatter;
         this.timeFormatter = timeFormatter;
         this.encoding = encoding;
+        this.splitor = splitor;
     }
 
     public static Builder builder() {
@@ -78,6 +83,7 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
         private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
         private String encoding = StandardCharsets.UTF_8.name();
+        private TextLineSplitor textLineSplitor = new DefaultTextLineSplitor();
 
         private Builder() {}
 
@@ -116,6 +122,11 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
             return this;
         }
 
+        public Builder textLineSplitor(TextLineSplitor splitor) {
+            this.textLineSplitor = splitor;
+            return this;
+        }
+
         public TextDeserializationSchema build() {
             return new TextDeserializationSchema(
                     seaTunnelRowType,
@@ -123,7 +134,8 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
                     dateFormatter,
                     dateTimeFormatter,
                     timeFormatter,
-                    encoding);
+                    encoding,
+                    textLineSplitor);
         }
     }
 
@@ -145,7 +157,7 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
 
     private Map<Integer, String> splitLineBySeaTunnelRowType(
             String line, SeaTunnelRowType seaTunnelRowType, int level) {
-        String[] splits = line.split(separators[level], -1);
+        String[] splits = splitor.spliteLine(line, separators[level]);
         LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
         for (int i = 0; i < splits.length; i++) {
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
new file mode 100644
index 0000000000..aff4d51827
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/CsvLineSplitor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@Slf4j
+public class CsvLineSplitor implements TextLineSplitor, Serializable {
+    private Map<Character, CSVFormat> splitorFormatMap = new HashMap<>();
+
+    @Override
+    public String[] spliteLine(String line, String splitor) {
+        Character splitChar = splitor.charAt(0);
+        if (Objects.isNull(splitorFormatMap.get(splitChar))) {
+            splitorFormatMap.put(splitChar, 
CSVFormat.DEFAULT.withDelimiter(splitChar));
+        }
+        CSVFormat format = splitorFormatMap.get(splitChar);
+        CSVParser parser = null;
+        // Method to parse the line into CSV with the given separator
+        try {
+            // Create CSV parser
+            parser = CSVParser.parse(line, format);
+            // Parse the CSV records
+            List<String> res = new ArrayList<>();
+            for (CSVRecord record : parser.getRecords()) {
+                for (String value : record) {
+                    res.add(value);
+                }
+            }
+            return res.toArray(new String[0]);
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getMessage(e));
+            return new String[0];
+        } finally {
+            if (Objects.nonNull(parser)) {
+                try {
+                    parser.close();
+                } catch (IOException e) {
+                    log.error(ExceptionUtils.getMessage(e));
+                }
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
new file mode 100644
index 0000000000..9c149eae3d
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/DefaultTextLineSplitor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+import java.io.Serializable;
+
+public class DefaultTextLineSplitor implements TextLineSplitor, Serializable {
+
+    @Override
+    public String[] spliteLine(String line, String seperator) {
+        return line.split(seperator, -1);
+    }
+}
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
new file mode 100644
index 0000000000..ad0c998ac1
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/splitor/TextLineSplitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text.splitor;
+
+public interface TextLineSplitor {
+    String[] spliteLine(String line, String splitor);
+}
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
new file mode 100644
index 0000000000..0f58e32f14
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/CsvTextFormatSchemaTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.text;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.text.splitor.CsvLineSplitor;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Map;
+
+public class CsvTextFormatSchemaTest {
+    public String content =
+            "\"mess,age\","
+                    + "true,"
+                    + "1,"
+                    + "2,"
+                    + "3,"
+                    + "4,"
+                    + "6.66,"
+                    + "7.77,"
+                    + "8.8888888,"
+                    + ','
+                    + "2022-09-24,"
+                    + "22:45:00,"
+                    + "2022-09-24 22:45:00,"
+                    // row field
+                    + String.join("\u0003", Arrays.asList("1", "2", "3", "4", 
"5", "6"))
+                    + '\002'
+                    + "tyrantlucifer\00418\003Kris\00421"
+                    + ','
+                    // array field
+                    + String.join("\u0002", Arrays.asList("1", "2", "3", "4", 
"5", "6"))
+                    + ','
+                    // map field
+                    + "tyrantlucifer"
+                    + '\003'
+                    + "18"
+                    + '\002'
+                    + "Kris"
+                    + '\003'
+                    + "21"
+                    + '\002'
+                    + "nullValueKey"
+                    + '\003'
+                    + '\002'
+                    + '\003'
+                    + "1231";
+
+    public SeaTunnelRowType seaTunnelRowType;
+
+    @BeforeEach
+    public void initSeaTunnelRowType() {
+        seaTunnelRowType =
+                new SeaTunnelRowType(
+                        new String[] {
+                            "string_field",
+                            "boolean_field",
+                            "tinyint_field",
+                            "smallint_field",
+                            "int_field",
+                            "bigint_field",
+                            "float_field",
+                            "double_field",
+                            "decimal_field",
+                            "null_field",
+                            "date_field",
+                            "time_field",
+                            "timestamp_field",
+                            "row_field",
+                            "array_field",
+                            "map_field"
+                        },
+                        new SeaTunnelDataType<?>[] {
+                            BasicType.STRING_TYPE,
+                            BasicType.BOOLEAN_TYPE,
+                            BasicType.BYTE_TYPE,
+                            BasicType.SHORT_TYPE,
+                            BasicType.INT_TYPE,
+                            BasicType.LONG_TYPE,
+                            BasicType.FLOAT_TYPE,
+                            BasicType.DOUBLE_TYPE,
+                            new DecimalType(30, 8),
+                            BasicType.VOID_TYPE,
+                            LocalTimeType.LOCAL_DATE_TYPE,
+                            LocalTimeType.LOCAL_TIME_TYPE,
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                            new SeaTunnelRowType(
+                                    new String[] {
+                                        "array_field", "map_field",
+                                    },
+                                    new SeaTunnelDataType<?>[] {
+                                        ArrayType.INT_ARRAY_TYPE,
+                                        new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE),
+                                    }),
+                            ArrayType.INT_ARRAY_TYPE,
+                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE)
+                        });
+    }
+
+    @Test
+    public void testParse() throws IOException {
+        String delimiter = ",";
+        TextDeserializationSchema deserializationSchema =
+                TextDeserializationSchema.builder()
+                        .seaTunnelRowType(seaTunnelRowType)
+                        .delimiter(delimiter)
+                        .textLineSplitor(new CsvLineSplitor())
+                        .build();
+        SeaTunnelRow seaTunnelRow = 
deserializationSchema.deserialize(content.getBytes());
+        Assertions.assertEquals("mess,age", seaTunnelRow.getField(0));
+        Assertions.assertEquals(Boolean.TRUE, seaTunnelRow.getField(1));
+        Assertions.assertEquals(Byte.valueOf("1"), seaTunnelRow.getField(2));
+        Assertions.assertEquals(Short.valueOf("2"), seaTunnelRow.getField(3));
+        Assertions.assertEquals(Integer.valueOf("3"), 
seaTunnelRow.getField(4));
+        Assertions.assertEquals(Long.valueOf("4"), seaTunnelRow.getField(5));
+        Assertions.assertEquals(Float.valueOf("6.66"), 
seaTunnelRow.getField(6));
+        Assertions.assertEquals(Double.valueOf("7.77"), 
seaTunnelRow.getField(7));
+        Assertions.assertEquals(BigDecimal.valueOf(8.8888888D), 
seaTunnelRow.getField(8));
+        Assertions.assertNull((seaTunnelRow.getField(9)));
+        Assertions.assertEquals(LocalDate.of(2022, 9, 24), 
seaTunnelRow.getField(10));
+        Assertions.assertEquals(((Map<?, ?>) 
(seaTunnelRow.getField(15))).get("tyrantlucifer"), 18);
+        Assertions.assertEquals(((Map<?, ?>) 
(seaTunnelRow.getField(15))).get("Kris"), 21);
+    }
+}

Reply via email to