This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 148296496a [core] update null-literal default value for csv format (#6104) 148296496a is described below commit 148296496a857887b44d1261af4e5da20db5a041 Author: jerry <lining....@alibaba-inc.com> AuthorDate: Wed Aug 20 17:25:06 2025 +0800 [core] update null-literal default value for csv format (#6104) --- docs/content/concepts/spec/fileformat.md | 2 +- .../apache/paimon/format/csv/CsvFileFormat.java | 2 + .../apache/paimon/format/csv/CsvFileReader.java | 5 + .../apache/paimon/format/csv/CsvFormatWriter.java | 5 + .../org/apache/paimon/format/csv/CsvOptions.java | 2 +- .../paimon/format/csv/CsvFileFormatTest.java | 322 ++++++++++++++++++++- 6 files changed, 324 insertions(+), 14 deletions(-) diff --git a/docs/content/concepts/spec/fileformat.md b/docs/content/concepts/spec/fileformat.md index 98798f7da0..d0873de348 100644 --- a/docs/content/concepts/spec/fileformat.md +++ b/docs/content/concepts/spec/fileformat.md @@ -414,7 +414,7 @@ Format Options: </tr> <tr> <td><h5>csv.null-literal</h5></td> - <td style="word-wrap: break-word;">(none)</td> + <td style="word-wrap: break-word;"><code>""</code></td> <td>String</td> <td>Null literal string that is interpreted as a null value (disabled by default).</td> </tr> diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java index b69e49187c..6dce1b470a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java @@ -86,6 +86,8 @@ public class CsvFileFormat extends FileFormat { case FLOAT: case DOUBLE: case DATE: + case BINARY: + case VARBINARY: case TIME_WITHOUT_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java index fae32adc59..208ab73118 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java @@ -42,12 +42,14 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** CSV file reader implementation. */ public class CsvFileReader implements FileRecordReader<InternalRow> { + private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder(); private static final CsvMapper CSV_MAPPER = new CsvMapper(); // Performance optimization: Cache frequently used cast executors @@ -203,6 +205,9 @@ public class CsvFileReader implements FileRecordReader<InternalRow> { case CHAR: case VARCHAR: return BinaryString.fromString(field); + case BINARY: + case VARBINARY: + return BASE64_DECODER.decode(field); default: return useCachedCastExecutor(field, dataType); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java index a7ffb1a110..e6f5fd167d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java @@ -31,12 +31,14 @@ import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** CSV format writer implementation. */ public class CsvFormatWriter implements FormatWriter { + private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); // Performance optimization: Cache frequently used cast executors private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE = new ConcurrentHashMap<>(32); @@ -154,6 +156,9 @@ public class CsvFormatWriter implements FormatWriter { case CHAR: case VARCHAR: return value.toString(); + case BINARY: + case VARBINARY: + return BASE64_ENCODER.encodeToString((byte[]) value); default: return useCachedStringCastExecutor(value, dataType); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java index e661127a9a..6a212a41cc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java @@ -59,7 +59,7 @@ public class CsvOptions { public static final ConfigOption<String> NULL_LITERAL = ConfigOptions.key("csv.null-literal") .stringType() - .defaultValue("null") + .defaultValue("") .withDescription("The literal for null values in CSV format"); private final String fieldDelimiter; diff --git a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java index c9c5f3dbb5..bed689bd4d 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java @@ -18,14 +18,22 @@ package org.apache.paimon.format.csv; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -35,6 +43,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -55,18 +64,6 @@ public class CsvFileFormatTest extends FormatReadWriteTest { return new CsvFileFormatFactory().create(new FormatContext(new Options(), 1024, 1024)); } - @Test - public void testWhenUseHiveDefaultDelimiter() throws IOException { - Options options = new Options(); - options.set(CsvOptions.FIELD_DELIMITER, "\001"); - FileFormat format = - new CsvFileFormatFactory().create(new FormatContext(new Options(), 1024, 1024)); - testSimpleTypesUtil( - format, new Path(new Path(parent.toUri()), UUID.randomUUID() + "." + formatType)); - testFullTypesUtil( - format, new Path(new Path(parent.toUri()), UUID.randomUUID() + "." + formatType)); - } - @Test public void testCsvParsingWithEmptyFields() throws IOException { @@ -157,6 +154,274 @@ public class CsvFileFormatTest extends FormatReadWriteTest { assertThat(fields[2]).isEqualTo("field3"); } + @Test + public void testCsvFieldDelimiterWriteRead() throws IOException { + RowType rowType = + DataTypes.ROW( + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.DOUBLE().notNull()); + + String[] delimiters = {",", ";", "|", "\t", "\001"}; + + // Create test data once (reused for all delimiters) + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice"), 100.5), + GenericRow.of(2, BinaryString.fromString("Bob"), 200.75), + GenericRow.of(3, BinaryString.fromString("Charlie"), 300.25)); + + for (String delimiter : delimiters) { + Options options = new Options(); + options.set(CsvOptions.FIELD_DELIMITER, delimiter); + + List<InternalRow> result = + writeThenRead( + options, rowType, testData, "test_field_delim_" + delimiter.hashCode()); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice"); + assertThat(result.get(0).getDouble(2)).isEqualTo(100.5); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob"); + assertThat(result.get(1).getDouble(2)).isEqualTo(200.75); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie"); + assertThat(result.get(2).getDouble(2)).isEqualTo(300.25); + } + } + + @Test + public void testCsvLineDelimiterWriteRead() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING()); + + String[] delimiters = {"\n", "\r", "\r\n"}; + + // Create test data once (reused for all delimiters) + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("First")), + GenericRow.of(2, BinaryString.fromString("Second")), + GenericRow.of(3, BinaryString.fromString("Third"))); + + for (String delimiter : delimiters) { + Options options = new Options(); + options.set(CsvOptions.LINE_DELIMITER, delimiter); + + List<InternalRow> result = + writeThenRead( + options, rowType, testData, "test_line_delim_" + delimiter.hashCode()); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("First"); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("Second"); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Third"); + } + } + + @Test + public void testCsvQuoteCharacterWriteRead() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING()); + + String[] quoteChars = {"\"", "'", "`"}; + + // Create test data with values that need quoting (contain spaces/commas) + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Hello, World")), + GenericRow.of(2, BinaryString.fromString("Test Value")), + GenericRow.of(3, BinaryString.fromString("Another, Test"))); + + for (String quoteChar : quoteChars) { + Options options = new Options(); + options.set(CsvOptions.QUOTE_CHARACTER, quoteChar); + + List<InternalRow> result = + writeThenRead( + options, rowType, testData, "test_quote_char_" + quoteChar.hashCode()); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("Hello, World"); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("Test Value"); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Another, Test"); + } + } + + @Test + public void testCsvEscapeCharacterWriteRead() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING()); + + String[] escapeChars = {"\\", "/"}; + + // Create test data with values that might need escaping + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Value\"With\"Quotes")), + GenericRow.of(2, BinaryString.fromString("Normal Value")), + GenericRow.of(3, BinaryString.fromString("Special\\Characters"))); + + for (String escapeChar : escapeChars) { + Options options = new Options(); + options.set(CsvOptions.ESCAPE_CHARACTER, escapeChar); + + List<InternalRow> result = + writeThenRead( + options, + rowType, + testData, + "test_escape_char_" + escapeChar.hashCode()); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("Normal Value"); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + } + } + + @Test + public void testCsvIncludeHeaderWriteRead() throws IOException { + RowType rowType = + DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BOOLEAN()); + + boolean[] includeHeaderOptions = {false, true}; + + // Create test data + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice"), true), + GenericRow.of(2, BinaryString.fromString("Bob"), false), + GenericRow.of(3, BinaryString.fromString("Charlie"), true)); + + for (boolean includeHeader : includeHeaderOptions) { + Options options = new Options(); + options.set(CsvOptions.INCLUDE_HEADER, includeHeader); + + List<InternalRow> result = + writeThenRead( + options, rowType, testData, "test_include_header_" + includeHeader); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice"); + assertThat(result.get(0).getBoolean(2)).isEqualTo(true); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob"); + assertThat(result.get(1).getBoolean(2)).isEqualTo(false); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie"); + assertThat(result.get(2).getBoolean(2)).isEqualTo(true); + } + } + + @Test + public void testCsvNullLiteralWriteRead() throws IOException { + RowType rowType = + DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.INT()); + + String[] nullLiterals = {"", "NULL", "null"}; + + // Create test data with null values + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice"), null), + GenericRow.of(2, null, 100), + GenericRow.of(3, BinaryString.fromString("Charlie"), 300)); + + for (String nullLiteral : nullLiterals) { + Options options = new Options(); + options.set(CsvOptions.NULL_LITERAL, nullLiteral); + + List<InternalRow> result = + writeThenRead( + options, + rowType, + testData, + "test_null_literal_" + nullLiteral.hashCode()); + + // Verify results + assertThat(result).hasSize(3); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice"); + assertThat(result.get(0).isNullAt(2)).isTrue(); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).isNullAt(1)).isTrue(); + assertThat(result.get(1).getInt(2)).isEqualTo(100); + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie"); + assertThat(result.get(2).getInt(2)).isEqualTo(300); + } + } + + @Test + public void testCsvOptionsCombinationWriteRead() throws IOException { + RowType rowType = + DataTypes.ROW( + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.DOUBLE(), + DataTypes.BOOLEAN()); + + // Test multiple CSV options together + Options options = new Options(); + options.set(CsvOptions.FIELD_DELIMITER, ";"); + options.set(CsvOptions.LINE_DELIMITER, "\r\n"); + options.set(CsvOptions.QUOTE_CHARACTER, "'"); + options.set(CsvOptions.ESCAPE_CHARACTER, "/"); + options.set(CsvOptions.INCLUDE_HEADER, true); + options.set(CsvOptions.NULL_LITERAL, "NULL"); + + // Create test data with various scenarios + List<InternalRow> testData = + Arrays.asList( + GenericRow.of(1, BinaryString.fromString("Alice; Test"), 100.5, true), + GenericRow.of(2, null, 200.75, false), + GenericRow.of(3, BinaryString.fromString("Charlie's Data"), null, true), + GenericRow.of(4, BinaryString.fromString("Normal"), 400.0, null)); + + List<InternalRow> result = + writeThenRead(options, rowType, testData, "test_csv_combination"); + + // Verify results + assertThat(result).hasSize(4); + + // Verify first row + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice; Test"); + assertThat(result.get(0).getDouble(2)).isEqualTo(100.5); + assertThat(result.get(0).getBoolean(3)).isEqualTo(true); + + // Verify second row (with null string) + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).isNullAt(1)).isTrue(); + assertThat(result.get(1).getDouble(2)).isEqualTo(200.75); + assertThat(result.get(1).getBoolean(3)).isEqualTo(false); + + // Verify third row (with null double) + assertThat(result.get(2).getInt(0)).isEqualTo(3); + assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie's Data"); + assertThat(result.get(2).isNullAt(2)).isTrue(); + assertThat(result.get(2).getBoolean(3)).isEqualTo(true); + + // Verify fourth row (with null boolean) + assertThat(result.get(3).getInt(0)).isEqualTo(4); + assertThat(result.get(3).getString(1).toString()).isEqualTo("Normal"); + assertThat(result.get(3).getDouble(2)).isEqualTo(400.0); + assertThat(result.get(3).isNullAt(3)).isTrue(); + } + @Override protected RowType rowTypeForFullTypesTest() { RowType.Builder builder = @@ -168,6 +433,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { .field("tinyint", DataTypes.TINYINT()) .field("smallint", DataTypes.SMALLINT()) .field("bigint", DataTypes.BIGINT()) + .field("bytes", DataTypes.BYTES()) .field("timestamp", DataTypes.TIMESTAMP()) .field("timestamp_3", DataTypes.TIMESTAMP(3)) .field("timestamp_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) @@ -197,6 +463,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest { (byte) 3, (short) 6, 12304L, + new byte[] {1, 5, 2}, Timestamp.fromMicros(123123123), Timestamp.fromEpochMillis(123123123), Timestamp.fromMicros(123123123), @@ -227,4 +494,35 @@ public class CsvFileFormatTest extends FormatReadWriteTest { .withNullValue("null"); return CsvFileReader.parseCsvLineToArray(csvLine, schema); } + + /** + * Performs a complete write-read test with the given options and test data. Returns the data + * that was read back for further verification. + */ + private List<InternalRow> writeThenRead( + Options options, RowType rowType, List<InternalRow> testData, String testPrefix) + throws IOException { + FileFormat format = + new CsvFileFormatFactory().create(new FormatContext(options, 1024, 1024)); + Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID() + ".csv"); + + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + try (PositionOutputStream out = fileIO.newOutputStream(testFile, false); + FormatWriter writer = writerFactory.create(out, "none")) { + for (InternalRow row : testData) { + writer.addElement(row); + } + } + try (RecordReader<InternalRow> reader = + format.createReaderFactory(rowType) + .createReader( + new FormatReaderContext( + fileIO, testFile, fileIO.getFileSize(testFile)))) { + + InternalRowSerializer serializer = new InternalRowSerializer(rowType); + List<InternalRow> result = new ArrayList<>(); + reader.forEachRemaining(row -> result.add(serializer.copy(row))); + return result; + } + } }