This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new a266520ad6 [core] Introduce csv format (#6093) a266520ad6 is described below commit a266520ad61f250e8a76068207473644acfb3d3d Author: jerry <lining....@alibaba-inc.com> AuthorDate: Tue Aug 19 22:17:02 2025 +0800 [core] Introduce csv format (#6093) --- .../generated/format_table_configuration.html | 36 ---- .../apache/paimon/format/SupportsDirectWrite.java | 2 +- .../apache/paimon/format/FormatReadWriteTest.java | 130 +++++++++++- .../apache/paimon/flink/FormatCatalogTable.java | 7 +- .../apache/paimon/format/csv/CsvFileFormat.java | 115 +++++++++++ .../paimon/format/csv/CsvFileFormatFactory.java | 27 ++- .../apache/paimon/format/csv/CsvFileReader.java | 228 ++++++++++++++++++++ .../apache/paimon/format/csv/CsvFormatWriter.java | 178 ++++++++++++++++ .../org/apache/paimon/format/csv/CsvOptions.java | 103 +++++++++ .../apache/paimon/format/csv/CsvReaderFactory.java | 25 ++- .../org.apache.paimon.format.FileFormatFactory | 1 + .../paimon/format/csv/CsvFileFormatTest.java | 230 +++++++++++++++++++++ .../java/org/apache/paimon/hive/HiveCatalog.java | 4 +- .../org/apache/paimon/hive/HiveTableUtils.java | 3 +- .../java/org/apache/paimon/spark/SparkCatalog.java | 4 +- 15 files changed, 1021 insertions(+), 72 deletions(-) diff --git a/docs/layouts/shortcodes/generated/format_table_configuration.html b/docs/layouts/shortcodes/generated/format_table_configuration.html deleted file mode 100644 index 71133d52d8..0000000000 --- a/docs/layouts/shortcodes/generated/format_table_configuration.html +++ /dev/null @@ -1,36 +0,0 @@ -{{/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/}} -<table class="configuration table table-bordered"> - <thead> - <tr> - <th class="text-left" style="width: 20%">Key</th> - <th class="text-left" style="width: 15%">Default</th> - <th class="text-left" style="width: 10%">Type</th> - <th class="text-left" style="width: 55%">Description</th> - </tr> - </thead> - <tbody> - <tr> - <td><h5>field-delimiter</h5></td> - <td style="word-wrap: break-word;">","</td> - <td>String</td> - <td>Optional field delimiter character for CSV (',' by default).</td> - </tr> - </tbody> -</table> diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java index 76ac9a8ba1..6f513330f4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java @@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path; import java.io.IOException; -/** Creaet a FormatWriter which has full control abort file io. */ +/** Create a FormatWriter which has full control abort file io. */ public interface SupportsDirectWrite { FormatWriter create(FileIO fileIO, Path path, String compression) throws IOException; diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java index 4a7325f410..40dcd975fc 100644 --- a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.format; import org.apache.paimon.data.BinaryArray; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; @@ -33,6 +34,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -62,10 +64,11 @@ public abstract class FormatReadWriteTest { @TempDir java.nio.file.Path tempPath; - private final String formatType; + protected final String formatType; protected FileIO fileIO; protected Path file; + protected Path parent; protected FormatReadWriteTest(String formatType) { this.formatType = formatType; @@ -74,6 +77,7 @@ public abstract class FormatReadWriteTest { @BeforeEach public void beforeEach() { this.fileIO = LocalFileIO.create(); + this.parent = new Path(tempPath.toUri()); this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID() + "." + formatType); } @@ -81,6 +85,11 @@ public abstract class FormatReadWriteTest { @Test public void testSimpleTypes() throws IOException { + FileFormat format = fileFormat(); + testSimpleTypesUtil(format, file); + } + + protected void testSimpleTypesUtil(FileFormat format, Path file) throws IOException { RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.BIGINT()); if (ThreadLocalRandom.current().nextBoolean()) { @@ -88,9 +97,8 @@ public abstract class FormatReadWriteTest { } InternalRowSerializer serializer = new InternalRowSerializer(rowType); - FileFormat format = fileFormat(); FormatWriterFactory factory = format.createWriterFactory(rowType); - write(factory, GenericRow.of(1, 1L), GenericRow.of(2, 2L), GenericRow.of(3, null)); + write(factory, file, GenericRow.of(1, 1L), GenericRow.of(2, 2L), GenericRow.of(3, null)); RecordReader<InternalRow> reader = format.createReaderFactory(rowType) .createReader( @@ -106,12 +114,16 @@ public abstract class FormatReadWriteTest { @Test public void testFullTypes() throws IOException { + FileFormat format = fileFormat(); + testFullTypesUtil(format, file); + } + + protected void testFullTypesUtil(FileFormat format, Path file) throws IOException { RowType rowType = rowTypeForFullTypesTest(); InternalRow expected = expectedRowForFullTypesTest(); - FileFormat format = fileFormat(); FormatWriterFactory factory = format.createWriterFactory(rowType); - write(factory, expected); + write(factory, file, expected); RecordReader<InternalRow> reader = format.createReaderFactory(rowType) .createReader( @@ -124,8 +136,15 @@ public abstract class FormatReadWriteTest { validateFullTypesResult(result.get(0), expected); } + public boolean supportNestedReadPruning() { + return true; + } + @Test public void testNestedReadPruning() throws Exception { + if (!supportNestedReadPruning()) { + return; + } FileFormat format = fileFormat(); RowType writeType = @@ -140,7 +159,7 @@ public abstract class FormatReadWriteTest { DataTypes.FIELD(4, "f2", DataTypes.INT())))); FormatWriterFactory factory = format.createWriterFactory(writeType); - write(factory, GenericRow.of(0, GenericRow.of(10, 11, 12))); + write(factory, file, GenericRow.of(0, GenericRow.of(10, 11, 12))); // skip read f0, f1.f1 RowType readType = @@ -176,7 +195,10 @@ public abstract class FormatReadWriteTest { RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT())); FormatWriterFactory factory = format.createWriterFactory(writeType); - write(factory, GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}"))); + write( + factory, + file, + GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}"))); List<InternalRow> result = new ArrayList<>(); try (RecordReader<InternalRow> reader = format.createReaderFactory(writeType) @@ -201,6 +223,7 @@ public abstract class FormatReadWriteTest { RowType writeType = DataTypes.ROW(new ArrayType(true, new VariantType())); write( format.createWriterFactory(writeType), + file, GenericRow.of( new GenericArray( new Object[] { @@ -222,7 +245,8 @@ public abstract class FormatReadWriteTest { assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}"); } - private void write(FormatWriterFactory factory, InternalRow... rows) throws IOException { + protected void write(FormatWriterFactory factory, Path file, InternalRow... rows) + throws IOException { FormatWriter writer; PositionOutputStream out = null; if (factory instanceof SupportsDirectWrite) { @@ -332,6 +356,94 @@ public abstract class FormatReadWriteTest { return GenericRow.of(values.toArray()); } + public boolean supportDataFileWithoutExtension() { + return false; + } + + @Test + public void testWriteAndReadFileWithoutExtension() throws IOException { + if (!supportDataFileWithoutExtension()) { + return; + } + RowType rowType = + RowType.of(DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.BOOLEAN()); + + // Create test data + List<InternalRow> testData = new ArrayList<>(); + testData.add(GenericRow.of(1, BinaryString.fromString("Alice"), true)); + testData.add(GenericRow.of(2, BinaryString.fromString("Bob"), false)); + testData.add(GenericRow.of(3, BinaryString.fromString("Charlie"), true)); + + // Create file format + FileFormat jsonFormat = fileFormat(); + + // Write data + Path filePath = new Path(parent, UUID.randomUUID().toString()); + FormatWriterFactory writerFactory = jsonFormat.createWriterFactory(rowType); + try (FormatWriter writer = + writerFactory.create(fileIO.newOutputStream(filePath, false), "none")) { + for (InternalRow row : testData) { + writer.addElement(row); + } + } + + // Read data + FormatReaderFactory readerFactory = jsonFormat.createReaderFactory(rowType, null); + FileRecordReader<InternalRow> reader = + readerFactory.createReader( + new FormatReaderFactory.Context() { + @Override + public FileIO fileIO() { + return fileIO; + } + + @Override + public Path filePath() { + return filePath; + } + + @Override + public long fileSize() { + try { + return fileIO.getFileSize(filePath); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public org.apache.paimon.utils.RoaringBitmap32 selection() { + return null; + } + }); + + List<InternalRow> readData = new ArrayList<>(); + RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch(); + while (iterator != null) { + InternalRow row; + while ((row = iterator.next()) != null) { + readData.add(GenericRow.of(row.getInt(0), row.getString(1), row.getBoolean(2))); + } + iterator.releaseBatch(); + iterator = reader.readBatch(); + } + reader.close(); + + // Verify data + assertThat(readData).hasSize(3); + assertThat(readData.get(0).getInt(0)).isEqualTo(1); + assertThat(readData.get(0).getString(1).toString()).isEqualTo("Alice"); + assertThat(readData.get(0).getBoolean(2)).isTrue(); + + assertThat(readData.get(1).getInt(0)).isEqualTo(2); + assertThat(readData.get(1).getString(1).toString()).isEqualTo("Bob"); + assertThat(readData.get(1).getBoolean(2)).isFalse(); + + assertThat(readData.get(2).getInt(0)).isEqualTo(3); + assertThat(readData.get(2).getString(1).toString()).isEqualTo("Charlie"); + assertThat(readData.get(2).getBoolean(2)).isTrue(); + } + private DataType getMapValueType() { if (formatType.equals("avro") || formatType.equals("orc")) { return DataTypes.ROW( @@ -351,7 +463,7 @@ public abstract class FormatReadWriteTest { } } - private void validateFullTypesResult(InternalRow actual, InternalRow expected) { + protected void validateFullTypesResult(InternalRow actual, InternalRow expected) { RowType rowType = rowTypeForFullTypesTest(); InternalRow.FieldGetter[] fieldGetters = IntStream.range(0, rowType.getFieldCount()) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java index cb69cce258..3d1b4d1c6b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.format.csv.CsvOptions; import org.apache.paimon.table.FormatTable; import org.apache.flink.table.api.Schema; @@ -39,7 +40,6 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; -import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; /** A {@link CatalogTable} to represent format table. */ public class FormatCatalogTable implements CatalogTable { @@ -99,8 +99,9 @@ public class FormatCatalogTable implements CatalogTable { cachedOptions.put(k, v); } }); - if (options.containsKey(FIELD_DELIMITER.key())) { - cachedOptions.put("csv.field-delimiter", options.get(FIELD_DELIMITER.key())); + if (options.containsKey(CsvOptions.FIELD_DELIMITER.key())) { + cachedOptions.put( + "csv.field-delimiter", options.get(CsvOptions.FIELD_DELIMITER.key())); } cachedOptions.put(CONNECTOR.key(), "filesystem"); cachedOptions.put(PATH.key(), table.location()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java new file mode 100644 index 0000000000..04e003c5b3 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** CSV {@link FileFormat}. */ +public class CsvFileFormat extends FileFormat { + + public static final String CSV_IDENTIFIER = "csv"; + + private final Options options; + + public CsvFileFormat(FormatContext context) { + this(context, CSV_IDENTIFIER); + } + + public CsvFileFormat(FormatContext context, String identifier) { + super(identifier); + this.options = context.options(); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType projectedRowType, @Nullable List<Predicate> filters) { + return new CsvReaderFactory(projectedRowType, new CsvOptions(options)); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + return new CsvWriterFactory(type, new CsvOptions(options)); + } + + @Override + public void validateDataFields(RowType rowType) { + List<DataType> fieldTypes = rowType.getFieldTypes(); + for (DataType dataType : fieldTypes) { + validateDataType(dataType); + } + } + + private void validateDataType(DataType dataType) { + // CSV format supports primitive types and string representation of complex types + DataTypeRoot typeRoot = dataType.getTypeRoot(); + switch (typeRoot) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // These are directly supported + break; + default: + throw new UnsupportedOperationException( + "Unsupported data type for CSV format: " + dataType); + } + } + + /** A {@link FormatWriterFactory} to write {@link InternalRow} to CSV. */ + private static class CsvWriterFactory implements FormatWriterFactory { + + private final RowType rowType; + private final CsvOptions options; + + public CsvWriterFactory(RowType rowType, CsvOptions options) { + this.rowType = rowType; + this.options = options; + } + + @Override + public FormatWriter create(PositionOutputStream out, String compression) { + return new CsvFormatWriter(out, rowType, options); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java similarity index 61% rename from paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java rename to paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java index b4010209c3..d565b831cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java @@ -16,18 +16,23 @@ * limitations under the License. */ -package org.apache.paimon.table; +package org.apache.paimon.format.csv; -import org.apache.paimon.options.ConfigOption; -import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; -/** Options of {@link FormatTable}. */ -public class FormatTableOptions { +/** Factory to create {@link CsvFileFormat}. */ +public class CsvFileFormatFactory implements FileFormatFactory { - public static final ConfigOption<String> FIELD_DELIMITER = - ConfigOptions.key("field-delimiter") - .stringType() - .defaultValue(",") - .withDescription( - "Optional field delimiter character for CSV (',' by default)."); + public static final String IDENTIFIER = "csv"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public FileFormat create(FormatContext formatContext) { + return new CsvFileFormat(formatContext); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java new file mode 100644 index 0000000000..80eb34de16 --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.casting.CastExecutor; +import org.apache.paimon.casting.CastExecutors; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import javax.annotation.Nullable; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** CSV file reader implementation. */ +public class CsvFileReader implements FileRecordReader<InternalRow> { + + private static final CsvMapper CSV_MAPPER = new CsvMapper(); + private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder(); + + // Performance optimization: Cache frequently used cast executors + private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE = + new ConcurrentHashMap<>(32); + + private final RowType rowType; + private final CsvOptions options; + private final Path filePath; + private final CsvSchema schema; + + private BufferedReader bufferedReader; + private boolean headerSkipped = false; + private boolean readerClosed = false; + private CsvRecordIterator reader; + + public CsvFileReader(FormatReaderFactory.Context context, RowType rowType, CsvOptions options) + throws IOException { + this.rowType = rowType; + this.filePath = context.filePath(); + this.options = options; + this.schema = + CsvSchema.emptySchema() + .withQuoteChar(options.quoteCharacter().charAt(0)) + .withColumnSeparator(options.fieldDelimiter().charAt(0)) + .withEscapeChar(options.escapeCharacter().charAt(0)); + if (!options.includeHeader()) { + this.schema.withoutHeader(); + } + FileIO fileIO = context.fileIO(); + SeekableInputStream inputStream = fileIO.newInputStream(context.filePath()); + reader = new CsvRecordIterator(); + InputStreamReader inputStreamReader = + new InputStreamReader(inputStream, StandardCharsets.UTF_8); + this.bufferedReader = new BufferedReader(inputStreamReader); + } + + @Override + @Nullable + public FileRecordIterator<InternalRow> readBatch() throws IOException { + if (readerClosed) { + return null; + } + + // Skip header if needed + if (options.includeHeader() && !headerSkipped) { + bufferedReader.readLine(); + headerSkipped = true; + } + if (reader.end) { + return null; + } + return reader; + } + + @Override + public void close() throws IOException { + if (!readerClosed && bufferedReader != null) { + bufferedReader.close(); + readerClosed = true; + } + } + + private class CsvRecordIterator implements FileRecordIterator<InternalRow> { + private boolean batchRead = false; + private long currentPosition = 0; + private String nextLine = null; + boolean end = false; + + @Override + @Nullable + public InternalRow next() throws IOException { + if (batchRead || readerClosed) { + return null; + } + nextLine = bufferedReader.readLine(); + if (nextLine == null) { + batchRead = true; + end = true; + return null; + } + + currentPosition++; + return parseCsvLine(nextLine, schema); + } + + @Override + public void releaseBatch() { + // No resources to release for CSV + } + + @Override + public long returnedPosition() { + return currentPosition - 1; // Return position of last returned row + } + + @Override + public Path filePath() { + return filePath; + } + } + + protected static String[] parseCsvLineToArray(String line, CsvSchema schema) + throws IOException { + if (line == null || line.isEmpty()) { + return new String[] {}; + } + return CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line); + } + + private InternalRow parseCsvLine(String line, CsvSchema schema) throws IOException { + String[] fields = parseCsvLineToArray(line, schema); + int fieldCount = Math.min(fields.length, rowType.getFieldCount()); + Object[] values = new Object[fieldCount]; // Pre-allocated array + + for (int i = 0; i < fieldCount; i++) { + String field = fields[i]; + + // Fast path for null values + if (field == null || field.equals(options.nullLiteral()) || field.isEmpty()) { + values[i] = null; + continue; + } + + // Optimized field parsing with cached cast executors + values[i] = parseFieldOptimized(field.trim(), rowType.getTypeAt(i)); + } + + return GenericRow.of(values); + } + + /** Optimized field parsing with caching and fast paths for common types. */ + private Object parseFieldOptimized(String field, DataType dataType) { + if (field == null || field.equals(options.nullLiteral())) { + return null; + } + + DataTypeRoot typeRoot = dataType.getTypeRoot(); + switch (typeRoot) { + case TINYINT: + return Byte.parseByte(field); + case SMALLINT: + return Short.parseShort(field); + case INTEGER: + return Integer.parseInt(field); + case BIGINT: + return Long.parseLong(field); + case FLOAT: + return Float.parseFloat(field); + case DOUBLE: + return Double.parseDouble(field); + case BOOLEAN: + return Boolean.parseBoolean(field); + case CHAR: + case VARCHAR: + return BinaryString.fromString(field); + default: + return useCachedCastExecutor(field, dataType); + } + } + + private Object useCachedCastExecutor(String field, DataType dataType) { + String cacheKey = dataType.toString(); + @SuppressWarnings("unchecked") + CastExecutor<BinaryString, Object> cast = + (CastExecutor<BinaryString, Object>) + CAST_EXECUTOR_CACHE.computeIfAbsent( + cacheKey, k -> CastExecutors.resolve(DataTypes.STRING(), dataType)); + + if (cast != null) { + return cast.cast(BinaryString.fromString(field)); + } + return BinaryString.fromString(field); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java new file mode 100644 index 0000000000..5012696dea --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.casting.CastExecutor; +import org.apache.paimon.casting.CastExecutors; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.RowType; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** CSV format writer implementation. */ +public class CsvFormatWriter implements FormatWriter { + + // Performance optimization: Cache frequently used cast executors + private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE = + new ConcurrentHashMap<>(32); + + private final RowType rowType; + private final CsvOptions options; + + private final BufferedWriter writer; + private final PositionOutputStream outputStream; + private boolean headerWritten = false; + + private final StringBuilder stringBuilder; + + public CsvFormatWriter(PositionOutputStream out, RowType rowType, CsvOptions options) { + this.rowType = rowType; + this.options = options; + this.outputStream = out; + OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8); + this.writer = new BufferedWriter(outputStreamWriter); + this.stringBuilder = new StringBuilder(); + } + + @Override + public void addElement(InternalRow element) throws IOException { + // Write header if needed + if (options.includeHeader() && !headerWritten) { + writeHeader(); + headerWritten = true; + } + + // Reuse StringBuilder for better performance + stringBuilder.setLength(0); // Reset without reallocating + + int fieldCount = rowType.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { + if (i > 0) { + stringBuilder.append(options.fieldDelimiter()); + } + + Object value = + InternalRow.createFieldGetter(rowType.getTypeAt(i), i).getFieldOrNull(element); + String fieldValue = escapeField(castToStringOptimized(value, rowType.getTypeAt(i))); + stringBuilder.append(fieldValue); + } + stringBuilder.append(options.lineDelimiter()); + + writer.write(stringBuilder.toString()); + } + + @Override + public void close() throws IOException { + if (writer != null) { + writer.flush(); + writer.close(); + } + } + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException { + if (outputStream != null && suggestedCheck) { + return outputStream.getPos() >= targetSize; + } + return false; + } + + private void writeHeader() throws IOException { + stringBuilder.setLength(0); // Reuse StringBuilder + + int fieldCount = rowType.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { + if (i > 0) { + stringBuilder.append(options.fieldDelimiter()); + } + stringBuilder.append(escapeField(rowType.getFieldNames().get(i))); + } + stringBuilder.append(options.lineDelimiter()); + writer.write(stringBuilder.toString()); + } + + private String escapeField(String field) { + if (field == null) { + return options.nullLiteral(); + } + + // Optimized escaping with early exit checks + boolean needsQuoting = + field.indexOf(options.fieldDelimiter().charAt(0)) >= 0 + || field.indexOf(options.lineDelimiter().charAt(0)) >= 0 + || field.indexOf(options.quoteCharacter().charAt(0)) >= 0; + + if (!needsQuoting) { + return field; + } + + // Only escape if needed + String escaped = + field.replace( + options.quoteCharacter(), + options.escapeCharacter() + options.quoteCharacter()); + return options.quoteCharacter() + escaped + options.quoteCharacter(); + } + + /** Optimized string casting with caching and fast paths for common types. */ + private String castToStringOptimized(Object value, DataType dataType) { + if (value == null) { + return null; + } + + DataTypeRoot typeRoot = dataType.getTypeRoot(); + switch (typeRoot) { + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case TINYINT: + case SMALLINT: + case CHAR: + case VARCHAR: + return value.toString(); + default: + return useCachedStringCastExecutor(value, dataType); + } + } + + private String useCachedStringCastExecutor(Object value, DataType dataType) { + String cacheKey = dataType.toString(); + CastExecutor<Object, ?> cast = + (CastExecutor<Object, ?>) + CAST_EXECUTOR_CACHE.computeIfAbsent( + cacheKey, k -> CastExecutors.resolveToString(dataType)); + + if (cast != null) { + Object result = cast.cast(value); + return result != null ? result.toString() : null; + } + return value.toString(); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java new file mode 100644 index 0000000000..956f530b3b --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; + +/** Options for csv format. */ +public class CsvOptions { + + public static final ConfigOption<String> FIELD_DELIMITER = + ConfigOptions.key("field-delimiter") + .stringType() + .defaultValue(",") + .withDescription("The field delimiter for CSV or TXT format"); + + public static final ConfigOption<String> LINE_DELIMITER = + ConfigOptions.key("line-delimiter") + .stringType() + .defaultValue("\n") + .withDescription("The line delimiter for CSV format"); + + public static final ConfigOption<String> QUOTE_CHARACTER = + ConfigOptions.key("quote-character") + .stringType() + .defaultValue("\"") + .withDescription("The quote character for CSV format"); + + public static final ConfigOption<String> ESCAPE_CHARACTER = + ConfigOptions.key("escape-character") + .stringType() + .defaultValue("\\") + .withDescription("The escape character for CSV format"); + + public static final ConfigOption<Boolean> INCLUDE_HEADER = + ConfigOptions.key("include-header") + .booleanType() + .defaultValue(false) + .withDescription("Whether to include header in CSV files"); + + public static final ConfigOption<String> NULL_LITERAL = + ConfigOptions.key("null-literal") + .stringType() + .defaultValue("null") + .withDescription("The literal for null values in CSV format"); + + private final String fieldDelimiter; + private final String lineDelimiter; + private final String nullLiteral; + private final boolean includeHeader; + private final String quoteCharacter; + private final String escapeCharacter; + + public CsvOptions(Options options) { + this.fieldDelimiter = options.get(FIELD_DELIMITER); + this.lineDelimiter = options.get(LINE_DELIMITER); + this.nullLiteral = options.get(NULL_LITERAL); + this.includeHeader = options.get(INCLUDE_HEADER); + this.quoteCharacter = options.get(QUOTE_CHARACTER); + this.escapeCharacter = options.get(ESCAPE_CHARACTER); + } + + public String fieldDelimiter() { + return fieldDelimiter; + } + + public String lineDelimiter() { + return lineDelimiter; + } + + public String nullLiteral() { + return nullLiteral; + } + + public boolean includeHeader() { + return includeHeader; + } + + public String quoteCharacter() { + return quoteCharacter; + } + + public String escapeCharacter() { + return escapeCharacter; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java similarity index 53% copy from paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java copy to paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java index 76ac9a8ba1..cfc67f576c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java @@ -16,15 +16,28 @@ * limitations under the License. */ -package org.apache.paimon.format; +package org.apache.paimon.format.csv; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.RowType; import java.io.IOException; -/** Creaet a FormatWriter which has full control abort file io. */ -public interface SupportsDirectWrite { +/** CSV {@link FormatReaderFactory} implementation. */ +public class CsvReaderFactory implements FormatReaderFactory { - FormatWriter create(FileIO fileIO, Path path, String compression) throws IOException; + private final RowType rowType; + private final CsvOptions options; + + public CsvReaderFactory(RowType rowType, CsvOptions options) { + this.rowType = rowType; + this.options = options; + } + + @Override + public FileRecordReader<InternalRow> createReader(Context context) throws IOException { + return new CsvFileReader(context, rowType, options); + } } diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory index 7af6f79b34..c35f5544ca 100644 --- a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory @@ -16,3 +16,4 @@ org.apache.paimon.format.avro.AvroFileFormatFactory org.apache.paimon.format.orc.OrcFileFormatFactory org.apache.paimon.format.parquet.ParquetFileFormatFactory +org.apache.paimon.format.csv.CsvFileFormatFactory diff --git a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java new file mode 100644 index 0000000000..c9c5f3dbb5 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.csv; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.paimon.data.BinaryString.fromString; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CsvFileFormat}. */ +public class CsvFileFormatTest extends FormatReadWriteTest { + + protected CsvFileFormatTest() { + super("csv"); + } + + @Override + protected FileFormat fileFormat() { + return new CsvFileFormatFactory().create(new FormatContext(new Options(), 1024, 1024)); + } + + @Test + public void testWhenUseHiveDefaultDelimiter() throws IOException { + Options options = new Options(); + options.set(CsvOptions.FIELD_DELIMITER, "\001"); + FileFormat format = + new CsvFileFormatFactory().create(new FormatContext(new Options(), 1024, 1024)); + testSimpleTypesUtil( + format, new Path(new Path(parent.toUri()), UUID.randomUUID() + "." + formatType)); + testFullTypesUtil( + format, new Path(new Path(parent.toUri()), UUID.randomUUID() + "." + formatType)); + } + + @Test + public void testCsvParsingWithEmptyFields() throws IOException { + + // First row: ,25,"Software Engineer" (empty first field) + String csvLine = ",25,\"Software Engineer\"\n"; + String[] fields = parse(csvLine); + assertThat(fields).isNotNull(); + assertThat(fields[0] == null); // empty field becomes null + assertThat(fields[1]).isEqualTo("25"); + assertThat(fields[2]).isEqualTo("Software Engineer"); + + // Second row: "John Doe",,"Developer" (empty middle field) + csvLine = "\"John Doe\",,\"Developer\"\n"; + fields = parse(csvLine); + assertThat(fields).isNotNull(); + assertThat(fields[0]).isEqualTo("John Doe"); + assertThat(fields[1] == null); // empty field becomes null + assertThat(fields[2]).isEqualTo("Developer"); + + // Third row: "Jane Smith",30, (empty last field) + csvLine = "\"Jane Smith\",30,\n"; + fields = parse(csvLine); + assertThat(fields).isNotNull(); + assertThat(fields[0]).isEqualTo("Jane Smith"); + assertThat(fields[1]).isEqualTo("30"); + assertThat(fields[2] == null); // empty field becomes null + } + + @Test + public void testJsonArrayQuotePreservation() throws Exception { + // Test that JSON arrays preserve quotes + String csvLine = "name,\"[1,2,3]\",age"; + String[] fields = parse(csvLine); + + assertThat(fields).hasSize(3); + assertThat(fields[0]).isEqualTo("name"); + assertThat(fields[1]).isEqualTo("[1,2,3]"); // Quotes should be preserved + assertThat(fields[2]).isEqualTo("age"); + } + + @Test + public void testJsonObjectQuotePreservation() throws Exception { + // Test that JSON objects preserve quotes + String csvLine = "id,{\"key\":\"value\"},status"; + String[] fields = parse(csvLine); + + assertThat(fields).hasSize(3); + assertThat(fields[0]).isEqualTo("id"); + assertThat(fields[1]).isEqualTo("{\"key\":\"value\"}"); // Quotes should be preserved + assertThat(fields[2]).isEqualTo("status"); + } + + @Test + public void testComplexJsonArrayQuotePreservation() throws Exception { + // Test complex JSON array with nested objects + String csvLine = + "field1,\"[{\"\"name\"\":\"\"John\"\"},{\"\"name\"\":\"\"Jane\"\"}]\",field3"; + String[] fields = parse(csvLine); + + assertThat(fields).hasSize(3); + assertThat(fields[0]).isEqualTo("field1"); + assertThat(fields[1]).isEqualTo("[{\"name\":\"John\"},{\"name\":\"Jane\"}]"); + assertThat(fields[2]).isEqualTo("field3"); + } + + @Test + public void testRegularQuotedFieldsRemoveQuotes() throws Exception { + // Test that regular quoted fields (not JSON) still remove quotes + String csvLine = "\"John,Doe\",\"25\",\"Engineer\""; + String[] fields = parse(csvLine); + + assertThat(fields).hasSize(3); + assertThat(fields[0]).isEqualTo("John,Doe"); // Quotes removed for regular field + assertThat(fields[1]).isEqualTo("25"); // Quotes removed + assertThat(fields[2]).isEqualTo("Engineer"); // Quotes removed + } + + @Test + public void testJsonWithWhitespace() throws Exception { + // Test JSON with leading whitespace after quote + String csvLine = "field1,\" [1,2,3]\",field3"; + String[] fields = parse(csvLine); + + assertThat(fields).hasSize(3); + assertThat(fields[0]).isEqualTo("field1"); + assertThat(fields[1]) + .isEqualTo(" [1,2,3]"); // Should preserve quotes due to [ after whitespace + assertThat(fields[2]).isEqualTo("field3"); + } + + @Override + protected RowType rowTypeForFullTypesTest() { + RowType.Builder builder = + RowType.builder() + .field("id", DataTypes.INT().notNull()) + .field("name", DataTypes.STRING()) /* optional by default */ + .field("salary", DataTypes.DOUBLE().notNull()) + .field("boolean", DataTypes.BOOLEAN().nullable()) + .field("tinyint", DataTypes.TINYINT()) + .field("smallint", DataTypes.SMALLINT()) + .field("bigint", DataTypes.BIGINT()) + .field("timestamp", DataTypes.TIMESTAMP()) + .field("timestamp_3", DataTypes.TIMESTAMP(3)) + .field("timestamp_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) + .field("timestamp_ltz_3", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) + .field("date", DataTypes.DATE()) + .field("decimal", DataTypes.DECIMAL(2, 2)) + .field("decimal2", DataTypes.DECIMAL(38, 2)) + .field("decimal3", DataTypes.DECIMAL(10, 1)); + + RowType rowType = builder.build(); + + if (ThreadLocalRandom.current().nextBoolean()) { + rowType = (RowType) rowType.notNull(); + } + + return rowType; + } + + @Override + protected GenericRow expectedRowForFullTypesTest() { + List<Object> values = + Arrays.asList( + 1, + fromString("name"), + 5.26D, + true, + (byte) 3, + (short) 6, + 12304L, + Timestamp.fromMicros(123123123), + Timestamp.fromEpochMillis(123123123), + Timestamp.fromMicros(123123123), + Timestamp.fromEpochMillis(123123123), + 2456, + Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2), + Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 38, 2), + Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 1)); + return GenericRow.of(values.toArray()); + } + + @Override + public boolean supportNestedReadPruning() { + return false; + } + + @Override + public boolean supportDataFileWithoutExtension() { + return true; + } + + private String[] parse(String csvLine) throws IOException { + CsvSchema schema = + CsvSchema.emptySchema() + .withQuoteChar('\"') + .withColumnSeparator(',') + .withoutHeader() + .withNullValue("null"); + return CsvFileReader.parseCsvLineToArray(csvLine, schema); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 199714d2fa..f78780b629 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -124,7 +124,6 @@ import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; -import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -1488,10 +1487,11 @@ public class HiveCatalog extends AbstractCatalog { @Nullable FormatTable.Format provider, Map<String, String> tableParameters) { Map<String, String> param = new HashMap<>(); if (provider == FormatTable.Format.CSV) { + String delimiterKey = "field-delimiter"; param.put( FIELD_DELIM, tableParameters.getOrDefault( - FIELD_DELIMITER.key(), options.get(FIELD_DELIMITER))); + delimiterKey, options.getString(delimiterKey, ","))); } return param; } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java index fdf520689b..cf04b32d90 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java @@ -39,7 +39,6 @@ import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; import static org.apache.paimon.hive.HiveCatalog.HIVE_FIELD_DELIM_DEFAULT; import static org.apache.paimon.hive.HiveCatalog.isView; -import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; class HiveTableUtils { @@ -78,7 +77,7 @@ class HiveTableUtils { } else { format = Format.CSV; options.set( - FIELD_DELIMITER, + "field-delimiter", serdeInfo .getParameters() .getOrDefault(FIELD_DELIM, HIVE_FIELD_DELIM_DEFAULT)); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 53dc85eb44..c172f4d60c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.format.csv.CsvOptions; import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionDefinition; import org.apache.paimon.options.Options; @@ -39,7 +40,6 @@ import org.apache.paimon.spark.catalog.functions.V1FunctionConverter; import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry; import org.apache.paimon.spark.utils.CatalogUtils; import org.apache.paimon.table.FormatTable; -import org.apache.paimon.table.FormatTableOptions; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.utils.ExceptionUtils; @@ -530,7 +530,7 @@ public class SparkCatalog extends SparkBaseCatalog Options options = Options.fromMap(formatTable.options()); CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(options.toMap()); if (formatTable.format() == FormatTable.Format.CSV) { - options.set("sep", options.get(FormatTableOptions.FIELD_DELIMITER)); + options.set("sep", options.get(CsvOptions.FIELD_DELIMITER)); dsOptions = new CaseInsensitiveStringMap(options.toMap()); return new PartitionedCSVTable( ident.name(),