This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a266520ad6 [core] Introduce csv format (#6093)
a266520ad6 is described below

commit a266520ad61f250e8a76068207473644acfb3d3d
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Tue Aug 19 22:17:02 2025 +0800

    [core] Introduce csv format (#6093)
---
 .../generated/format_table_configuration.html      |  36 ----
 .../apache/paimon/format/SupportsDirectWrite.java  |   2 +-
 .../apache/paimon/format/FormatReadWriteTest.java  | 130 +++++++++++-
 .../apache/paimon/flink/FormatCatalogTable.java    |   7 +-
 .../apache/paimon/format/csv/CsvFileFormat.java    | 115 +++++++++++
 .../paimon/format/csv/CsvFileFormatFactory.java    |  27 ++-
 .../apache/paimon/format/csv/CsvFileReader.java    | 228 ++++++++++++++++++++
 .../apache/paimon/format/csv/CsvFormatWriter.java  | 178 ++++++++++++++++
 .../org/apache/paimon/format/csv/CsvOptions.java   | 103 +++++++++
 .../apache/paimon/format/csv/CsvReaderFactory.java |  25 ++-
 .../org.apache.paimon.format.FileFormatFactory     |   1 +
 .../paimon/format/csv/CsvFileFormatTest.java       | 230 +++++++++++++++++++++
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   4 +-
 .../org/apache/paimon/hive/HiveTableUtils.java     |   3 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   4 +-
 15 files changed, 1021 insertions(+), 72 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/format_table_configuration.html 
b/docs/layouts/shortcodes/generated/format_table_configuration.html
deleted file mode 100644
index 71133d52d8..0000000000
--- a/docs/layouts/shortcodes/generated/format_table_configuration.html
+++ /dev/null
@@ -1,36 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-<table class="configuration table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 10%">Type</th>
-            <th class="text-left" style="width: 55%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>field-delimiter</h5></td>
-            <td style="word-wrap: break-word;">","</td>
-            <td>String</td>
-            <td>Optional field delimiter character for CSV (',' by 
default).</td>
-        </tr>
-    </tbody>
-</table>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java 
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
index 76ac9a8ba1..6f513330f4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
@@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path;
 
 import java.io.IOException;
 
-/** Creaet a FormatWriter which has full control abort file io. */
+/** Create a FormatWriter which has full control abort file io. */
 public interface SupportsDirectWrite {
 
     FormatWriter create(FileIO fileIO, Path path, String compression) throws 
IOException;
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 4a7325f410..40dcd975fc 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format;
 
 import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
@@ -33,6 +34,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
@@ -62,10 +64,11 @@ public abstract class FormatReadWriteTest {
 
     @TempDir java.nio.file.Path tempPath;
 
-    private final String formatType;
+    protected final String formatType;
 
     protected FileIO fileIO;
     protected Path file;
+    protected Path parent;
 
     protected FormatReadWriteTest(String formatType) {
         this.formatType = formatType;
@@ -74,6 +77,7 @@ public abstract class FormatReadWriteTest {
     @BeforeEach
     public void beforeEach() {
         this.fileIO = LocalFileIO.create();
+        this.parent = new Path(tempPath.toUri());
         this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID() + 
"." + formatType);
     }
 
@@ -81,6 +85,11 @@ public abstract class FormatReadWriteTest {
 
     @Test
     public void testSimpleTypes() throws IOException {
+        FileFormat format = fileFormat();
+        testSimpleTypesUtil(format, file);
+    }
+
+    protected void testSimpleTypesUtil(FileFormat format, Path file) throws 
IOException {
         RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.BIGINT());
 
         if (ThreadLocalRandom.current().nextBoolean()) {
@@ -88,9 +97,8 @@ public abstract class FormatReadWriteTest {
         }
 
         InternalRowSerializer serializer = new InternalRowSerializer(rowType);
-        FileFormat format = fileFormat();
         FormatWriterFactory factory = format.createWriterFactory(rowType);
-        write(factory, GenericRow.of(1, 1L), GenericRow.of(2, 2L), 
GenericRow.of(3, null));
+        write(factory, file, GenericRow.of(1, 1L), GenericRow.of(2, 2L), 
GenericRow.of(3, null));
         RecordReader<InternalRow> reader =
                 format.createReaderFactory(rowType)
                         .createReader(
@@ -106,12 +114,16 @@ public abstract class FormatReadWriteTest {
 
     @Test
     public void testFullTypes() throws IOException {
+        FileFormat format = fileFormat();
+        testFullTypesUtil(format, file);
+    }
+
+    protected void testFullTypesUtil(FileFormat format, Path file) throws 
IOException {
         RowType rowType = rowTypeForFullTypesTest();
         InternalRow expected = expectedRowForFullTypesTest();
-        FileFormat format = fileFormat();
 
         FormatWriterFactory factory = format.createWriterFactory(rowType);
-        write(factory, expected);
+        write(factory, file, expected);
         RecordReader<InternalRow> reader =
                 format.createReaderFactory(rowType)
                         .createReader(
@@ -124,8 +136,15 @@ public abstract class FormatReadWriteTest {
         validateFullTypesResult(result.get(0), expected);
     }
 
+    public boolean supportNestedReadPruning() {
+        return true;
+    }
+
     @Test
     public void testNestedReadPruning() throws Exception {
+        if (!supportNestedReadPruning()) {
+            return;
+        }
         FileFormat format = fileFormat();
 
         RowType writeType =
@@ -140,7 +159,7 @@ public abstract class FormatReadWriteTest {
                                         DataTypes.FIELD(4, "f2", 
DataTypes.INT()))));
 
         FormatWriterFactory factory = format.createWriterFactory(writeType);
-        write(factory, GenericRow.of(0, GenericRow.of(10, 11, 12)));
+        write(factory, file, GenericRow.of(0, GenericRow.of(10, 11, 12)));
 
         // skip read f0, f1.f1
         RowType readType =
@@ -176,7 +195,10 @@ public abstract class FormatReadWriteTest {
         RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", 
DataTypes.VARIANT()));
 
         FormatWriterFactory factory = format.createWriterFactory(writeType);
-        write(factory, 
GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}")));
+        write(
+                factory,
+                file,
+                
GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}")));
         List<InternalRow> result = new ArrayList<>();
         try (RecordReader<InternalRow> reader =
                 format.createReaderFactory(writeType)
@@ -201,6 +223,7 @@ public abstract class FormatReadWriteTest {
         RowType writeType = DataTypes.ROW(new ArrayType(true, new 
VariantType()));
         write(
                 format.createWriterFactory(writeType),
+                file,
                 GenericRow.of(
                         new GenericArray(
                                 new Object[] {
@@ -222,7 +245,8 @@ public abstract class FormatReadWriteTest {
         
assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}");
     }
 
-    private void write(FormatWriterFactory factory, InternalRow... rows) 
throws IOException {
+    protected void write(FormatWriterFactory factory, Path file, 
InternalRow... rows)
+            throws IOException {
         FormatWriter writer;
         PositionOutputStream out = null;
         if (factory instanceof SupportsDirectWrite) {
@@ -332,6 +356,94 @@ public abstract class FormatReadWriteTest {
         return GenericRow.of(values.toArray());
     }
 
+    public boolean supportDataFileWithoutExtension() {
+        return false;
+    }
+
+    @Test
+    public void testWriteAndReadFileWithoutExtension() throws IOException {
+        if (!supportDataFileWithoutExtension()) {
+            return;
+        }
+        RowType rowType =
+                RowType.of(DataTypes.INT().notNull(), DataTypes.STRING(), 
DataTypes.BOOLEAN());
+
+        // Create test data
+        List<InternalRow> testData = new ArrayList<>();
+        testData.add(GenericRow.of(1, BinaryString.fromString("Alice"), true));
+        testData.add(GenericRow.of(2, BinaryString.fromString("Bob"), false));
+        testData.add(GenericRow.of(3, BinaryString.fromString("Charlie"), 
true));
+
+        // Create file format
+        FileFormat jsonFormat = fileFormat();
+
+        // Write data
+        Path filePath = new Path(parent, UUID.randomUUID().toString());
+        FormatWriterFactory writerFactory = 
jsonFormat.createWriterFactory(rowType);
+        try (FormatWriter writer =
+                writerFactory.create(fileIO.newOutputStream(filePath, false), 
"none")) {
+            for (InternalRow row : testData) {
+                writer.addElement(row);
+            }
+        }
+
+        // Read data
+        FormatReaderFactory readerFactory = 
jsonFormat.createReaderFactory(rowType, null);
+        FileRecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderFactory.Context() {
+                            @Override
+                            public FileIO fileIO() {
+                                return fileIO;
+                            }
+
+                            @Override
+                            public Path filePath() {
+                                return filePath;
+                            }
+
+                            @Override
+                            public long fileSize() {
+                                try {
+                                    return fileIO.getFileSize(filePath);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+
+                            @Override
+                            public org.apache.paimon.utils.RoaringBitmap32 
selection() {
+                                return null;
+                            }
+                        });
+
+        List<InternalRow> readData = new ArrayList<>();
+        RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
+        while (iterator != null) {
+            InternalRow row;
+            while ((row = iterator.next()) != null) {
+                readData.add(GenericRow.of(row.getInt(0), row.getString(1), 
row.getBoolean(2)));
+            }
+            iterator.releaseBatch();
+            iterator = reader.readBatch();
+        }
+        reader.close();
+
+        // Verify data
+        assertThat(readData).hasSize(3);
+        assertThat(readData.get(0).getInt(0)).isEqualTo(1);
+        assertThat(readData.get(0).getString(1).toString()).isEqualTo("Alice");
+        assertThat(readData.get(0).getBoolean(2)).isTrue();
+
+        assertThat(readData.get(1).getInt(0)).isEqualTo(2);
+        assertThat(readData.get(1).getString(1).toString()).isEqualTo("Bob");
+        assertThat(readData.get(1).getBoolean(2)).isFalse();
+
+        assertThat(readData.get(2).getInt(0)).isEqualTo(3);
+        
assertThat(readData.get(2).getString(1).toString()).isEqualTo("Charlie");
+        assertThat(readData.get(2).getBoolean(2)).isTrue();
+    }
+
     private DataType getMapValueType() {
         if (formatType.equals("avro") || formatType.equals("orc")) {
             return DataTypes.ROW(
@@ -351,7 +463,7 @@ public abstract class FormatReadWriteTest {
         }
     }
 
-    private void validateFullTypesResult(InternalRow actual, InternalRow 
expected) {
+    protected void validateFullTypesResult(InternalRow actual, InternalRow 
expected) {
         RowType rowType = rowTypeForFullTypesTest();
         InternalRow.FieldGetter[] fieldGetters =
                 IntStream.range(0, rowType.getFieldCount())
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index cb69cce258..3d1b4d1c6b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.format.csv.CsvOptions;
 import org.apache.paimon.table.FormatTable;
 
 import org.apache.flink.table.api.Schema;
@@ -39,7 +40,6 @@ import static 
org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 
 /** A {@link CatalogTable} to represent format table. */
 public class FormatCatalogTable implements CatalogTable {
@@ -99,8 +99,9 @@ public class FormatCatalogTable implements CatalogTable {
                             cachedOptions.put(k, v);
                         }
                     });
-            if (options.containsKey(FIELD_DELIMITER.key())) {
-                cachedOptions.put("csv.field-delimiter", 
options.get(FIELD_DELIMITER.key()));
+            if (options.containsKey(CsvOptions.FIELD_DELIMITER.key())) {
+                cachedOptions.put(
+                        "csv.field-delimiter", 
options.get(CsvOptions.FIELD_DELIMITER.key()));
             }
             cachedOptions.put(CONNECTOR.key(), "filesystem");
             cachedOptions.put(PATH.key(), table.location());
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
new file mode 100644
index 0000000000..04e003c5b3
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** CSV {@link FileFormat}. */
+public class CsvFileFormat extends FileFormat {
+
+    public static final String CSV_IDENTIFIER = "csv";
+
+    private final Options options;
+
+    public CsvFileFormat(FormatContext context) {
+        this(context, CSV_IDENTIFIER);
+    }
+
+    public CsvFileFormat(FormatContext context, String identifier) {
+        super(identifier);
+        this.options = context.options();
+    }
+
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType projectedRowType, @Nullable List<Predicate> filters) {
+        return new CsvReaderFactory(projectedRowType, new CsvOptions(options));
+    }
+
+    @Override
+    public FormatWriterFactory createWriterFactory(RowType type) {
+        return new CsvWriterFactory(type, new CsvOptions(options));
+    }
+
+    @Override
+    public void validateDataFields(RowType rowType) {
+        List<DataType> fieldTypes = rowType.getFieldTypes();
+        for (DataType dataType : fieldTypes) {
+            validateDataType(dataType);
+        }
+    }
+
+    private void validateDataType(DataType dataType) {
+        // CSV format supports primitive types and string representation of 
complex types
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+        switch (typeRoot) {
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                // These are directly supported
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported data type for CSV format: " + dataType);
+        }
+    }
+
+    /** A {@link FormatWriterFactory} to write {@link InternalRow} to CSV. */
+    private static class CsvWriterFactory implements FormatWriterFactory {
+
+        private final RowType rowType;
+        private final CsvOptions options;
+
+        public CsvWriterFactory(RowType rowType, CsvOptions options) {
+            this.rowType = rowType;
+            this.options = options;
+        }
+
+        @Override
+        public FormatWriter create(PositionOutputStream out, String 
compression) {
+            return new CsvFormatWriter(out, rowType, options);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
similarity index 61%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
rename to 
paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
index b4010209c3..d565b831cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
@@ -16,18 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.table;
+package org.apache.paimon.format.csv;
 
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
 
-/** Options of {@link FormatTable}. */
-public class FormatTableOptions {
+/** Factory to create {@link CsvFileFormat}. */
+public class CsvFileFormatFactory implements FileFormatFactory {
 
-    public static final ConfigOption<String> FIELD_DELIMITER =
-            ConfigOptions.key("field-delimiter")
-                    .stringType()
-                    .defaultValue(",")
-                    .withDescription(
-                            "Optional field delimiter character for CSV (',' 
by default).");
+    public static final String IDENTIFIER = "csv";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public FileFormat create(FormatContext formatContext) {
+        return new CsvFileFormat(formatContext);
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
new file mode 100644
index 0000000000..80eb34de16
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** CSV file reader implementation. */
+public class CsvFileReader implements FileRecordReader<InternalRow> {
+
+    private static final CsvMapper CSV_MAPPER = new CsvMapper();
+    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+
+    // Performance optimization: Cache frequently used cast executors
+    private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
+            new ConcurrentHashMap<>(32);
+
+    private final RowType rowType;
+    private final CsvOptions options;
+    private final Path filePath;
+    private final CsvSchema schema;
+
+    private BufferedReader bufferedReader;
+    private boolean headerSkipped = false;
+    private boolean readerClosed = false;
+    private CsvRecordIterator reader;
+
+    public CsvFileReader(FormatReaderFactory.Context context, RowType rowType, 
CsvOptions options)
+            throws IOException {
+        this.rowType = rowType;
+        this.filePath = context.filePath();
+        this.options = options;
+        this.schema =
+                CsvSchema.emptySchema()
+                        .withQuoteChar(options.quoteCharacter().charAt(0))
+                        
.withColumnSeparator(options.fieldDelimiter().charAt(0))
+                        .withEscapeChar(options.escapeCharacter().charAt(0));
+        if (!options.includeHeader()) {
+            this.schema.withoutHeader();
+        }
+        FileIO fileIO = context.fileIO();
+        SeekableInputStream inputStream = 
fileIO.newInputStream(context.filePath());
+        reader = new CsvRecordIterator();
+        InputStreamReader inputStreamReader =
+                new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+        this.bufferedReader = new BufferedReader(inputStreamReader);
+    }
+
+    @Override
+    @Nullable
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        if (readerClosed) {
+            return null;
+        }
+
+        // Skip header if needed
+        if (options.includeHeader() && !headerSkipped) {
+            bufferedReader.readLine();
+            headerSkipped = true;
+        }
+        if (reader.end) {
+            return null;
+        }
+        return reader;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!readerClosed && bufferedReader != null) {
+            bufferedReader.close();
+            readerClosed = true;
+        }
+    }
+
+    private class CsvRecordIterator implements FileRecordIterator<InternalRow> 
{
+        private boolean batchRead = false;
+        private long currentPosition = 0;
+        private String nextLine = null;
+        boolean end = false;
+
+        @Override
+        @Nullable
+        public InternalRow next() throws IOException {
+            if (batchRead || readerClosed) {
+                return null;
+            }
+            nextLine = bufferedReader.readLine();
+            if (nextLine == null) {
+                batchRead = true;
+                end = true;
+                return null;
+            }
+
+            currentPosition++;
+            return parseCsvLine(nextLine, schema);
+        }
+
+        @Override
+        public void releaseBatch() {
+            // No resources to release for CSV
+        }
+
+        @Override
+        public long returnedPosition() {
+            return currentPosition - 1; // Return position of last returned row
+        }
+
+        @Override
+        public Path filePath() {
+            return filePath;
+        }
+    }
+
+    protected static String[] parseCsvLineToArray(String line, CsvSchema 
schema)
+            throws IOException {
+        if (line == null || line.isEmpty()) {
+            return new String[] {};
+        }
+        return 
CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line);
+    }
+
+    private InternalRow parseCsvLine(String line, CsvSchema schema) throws 
IOException {
+        String[] fields = parseCsvLineToArray(line, schema);
+        int fieldCount = Math.min(fields.length, rowType.getFieldCount());
+        Object[] values = new Object[fieldCount]; // Pre-allocated array
+
+        for (int i = 0; i < fieldCount; i++) {
+            String field = fields[i];
+
+            // Fast path for null values
+            if (field == null || field.equals(options.nullLiteral()) || 
field.isEmpty()) {
+                values[i] = null;
+                continue;
+            }
+
+            // Optimized field parsing with cached cast executors
+            values[i] = parseFieldOptimized(field.trim(), 
rowType.getTypeAt(i));
+        }
+
+        return GenericRow.of(values);
+    }
+
+    /** Optimized field parsing with caching and fast paths for common types. 
*/
+    private Object parseFieldOptimized(String field, DataType dataType) {
+        if (field == null || field.equals(options.nullLiteral())) {
+            return null;
+        }
+
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+        switch (typeRoot) {
+            case TINYINT:
+                return Byte.parseByte(field);
+            case SMALLINT:
+                return Short.parseShort(field);
+            case INTEGER:
+                return Integer.parseInt(field);
+            case BIGINT:
+                return Long.parseLong(field);
+            case FLOAT:
+                return Float.parseFloat(field);
+            case DOUBLE:
+                return Double.parseDouble(field);
+            case BOOLEAN:
+                return Boolean.parseBoolean(field);
+            case CHAR:
+            case VARCHAR:
+                return BinaryString.fromString(field);
+            default:
+                return useCachedCastExecutor(field, dataType);
+        }
+    }
+
+    private Object useCachedCastExecutor(String field, DataType dataType) {
+        String cacheKey = dataType.toString();
+        @SuppressWarnings("unchecked")
+        CastExecutor<BinaryString, Object> cast =
+                (CastExecutor<BinaryString, Object>)
+                        CAST_EXECUTOR_CACHE.computeIfAbsent(
+                                cacheKey, k -> 
CastExecutors.resolve(DataTypes.STRING(), dataType));
+
+        if (cast != null) {
+            return cast.cast(BinaryString.fromString(field));
+        }
+        return BinaryString.fromString(field);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
new file mode 100644
index 0000000000..5012696dea
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** CSV format writer implementation. */
+public class CsvFormatWriter implements FormatWriter {
+
+    // Performance optimization: Cache frequently used cast executors
+    private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
+            new ConcurrentHashMap<>(32);
+
+    private final RowType rowType;
+    private final CsvOptions options;
+
+    private final BufferedWriter writer;
+    private final PositionOutputStream outputStream;
+    private boolean headerWritten = false;
+
+    private final StringBuilder stringBuilder;
+
+    public CsvFormatWriter(PositionOutputStream out, RowType rowType, 
CsvOptions options) {
+        this.rowType = rowType;
+        this.options = options;
+        this.outputStream = out;
+        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
+        this.writer = new BufferedWriter(outputStreamWriter);
+        this.stringBuilder = new StringBuilder();
+    }
+
+    @Override
+    public void addElement(InternalRow element) throws IOException {
+        // Write header if needed
+        if (options.includeHeader() && !headerWritten) {
+            writeHeader();
+            headerWritten = true;
+        }
+
+        // Reuse StringBuilder for better performance
+        stringBuilder.setLength(0); // Reset without reallocating
+
+        int fieldCount = rowType.getFieldCount();
+        for (int i = 0; i < fieldCount; i++) {
+            if (i > 0) {
+                stringBuilder.append(options.fieldDelimiter());
+            }
+
+            Object value =
+                    InternalRow.createFieldGetter(rowType.getTypeAt(i), 
i).getFieldOrNull(element);
+            String fieldValue = escapeField(castToStringOptimized(value, 
rowType.getTypeAt(i)));
+            stringBuilder.append(fieldValue);
+        }
+        stringBuilder.append(options.lineDelimiter());
+
+        writer.write(stringBuilder.toString());
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (writer != null) {
+            writer.flush();
+            writer.close();
+        }
+    }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        if (outputStream != null && suggestedCheck) {
+            return outputStream.getPos() >= targetSize;
+        }
+        return false;
+    }
+
+    private void writeHeader() throws IOException {
+        stringBuilder.setLength(0); // Reuse StringBuilder
+
+        int fieldCount = rowType.getFieldCount();
+        for (int i = 0; i < fieldCount; i++) {
+            if (i > 0) {
+                stringBuilder.append(options.fieldDelimiter());
+            }
+            stringBuilder.append(escapeField(rowType.getFieldNames().get(i)));
+        }
+        stringBuilder.append(options.lineDelimiter());
+        writer.write(stringBuilder.toString());
+    }
+
+    private String escapeField(String field) {
+        if (field == null) {
+            return options.nullLiteral();
+        }
+
+        // Optimized escaping with early exit checks
+        boolean needsQuoting =
+                field.indexOf(options.fieldDelimiter().charAt(0)) >= 0
+                        || field.indexOf(options.lineDelimiter().charAt(0)) >= 0
+                        || field.indexOf(options.quoteCharacter().charAt(0)) 
>= 0;
+
+        if (!needsQuoting) {
+            return field;
+        }
+
+        // Only escape if needed
+        String escaped =
+                field.replace(
+                        options.quoteCharacter(),
+                        options.escapeCharacter() + options.quoteCharacter());
+        return options.quoteCharacter() + escaped + options.quoteCharacter();
+    }
+
+    /** Optimized string casting with caching and fast paths for common types. 
*/
+    private String castToStringOptimized(Object value, DataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+        switch (typeRoot) {
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case CHAR:
+            case VARCHAR:
+                return value.toString();
+            default:
+                return useCachedStringCastExecutor(value, dataType);
+        }
+    }
+
+    private String useCachedStringCastExecutor(Object value, DataType 
dataType) {
+        String cacheKey = dataType.toString();
+        CastExecutor<Object, ?> cast =
+                (CastExecutor<Object, ?>)
+                        CAST_EXECUTOR_CACHE.computeIfAbsent(
+                                cacheKey, k -> 
CastExecutors.resolveToString(dataType));
+
+        if (cast != null) {
+            Object result = cast.cast(value);
+            return result != null ? result.toString() : null;
+        }
+        return value.toString();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
new file mode 100644
index 0000000000..956f530b3b
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for csv format. */
+public class CsvOptions {
+
+    public static final ConfigOption<String> FIELD_DELIMITER =
+            ConfigOptions.key("field-delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription("The field delimiter for CSV or TXT 
format");
+
+    public static final ConfigOption<String> LINE_DELIMITER =
+            ConfigOptions.key("line-delimiter")
+                    .stringType()
+                    .defaultValue("\n")
+                    .withDescription("The line delimiter for CSV format");
+
+    public static final ConfigOption<String> QUOTE_CHARACTER =
+            ConfigOptions.key("quote-character")
+                    .stringType()
+                    .defaultValue("\"")
+                    .withDescription("The quote character for CSV format");
+
+    public static final ConfigOption<String> ESCAPE_CHARACTER =
+            ConfigOptions.key("escape-character")
+                    .stringType()
+                    .defaultValue("\\")
+                    .withDescription("The escape character for CSV format");
+
+    public static final ConfigOption<Boolean> INCLUDE_HEADER =
+            ConfigOptions.key("include-header")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to include header in CSV files");
+
+    public static final ConfigOption<String> NULL_LITERAL =
+            ConfigOptions.key("null-literal")
+                    .stringType()
+                    .defaultValue("null")
+                    .withDescription("The literal for null values in CSV 
format");
+
+    private final String fieldDelimiter;
+    private final String lineDelimiter;
+    private final String nullLiteral;
+    private final boolean includeHeader;
+    private final String quoteCharacter;
+    private final String escapeCharacter;
+
+    public CsvOptions(Options options) {
+        this.fieldDelimiter = options.get(FIELD_DELIMITER);
+        this.lineDelimiter = options.get(LINE_DELIMITER);
+        this.nullLiteral = options.get(NULL_LITERAL);
+        this.includeHeader = options.get(INCLUDE_HEADER);
+        this.quoteCharacter = options.get(QUOTE_CHARACTER);
+        this.escapeCharacter = options.get(ESCAPE_CHARACTER);
+    }
+
+    public String fieldDelimiter() {
+        return fieldDelimiter;
+    }
+
+    public String lineDelimiter() {
+        return lineDelimiter;
+    }
+
+    public String nullLiteral() {
+        return nullLiteral;
+    }
+
+    public boolean includeHeader() {
+        return includeHeader;
+    }
+
+    public String quoteCharacter() {
+        return quoteCharacter;
+    }
+
+    public String escapeCharacter() {
+        return escapeCharacter;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
similarity index 53%
copy from 
paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
copy to 
paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
index 76ac9a8ba1..cfc67f576c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
@@ -16,15 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format;
+package org.apache.paimon.format.csv;
 
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
 
-/** Creaet a FormatWriter which has full control abort file io. */
-public interface SupportsDirectWrite {
+/** CSV {@link FormatReaderFactory} implementation. */
+public class CsvReaderFactory implements FormatReaderFactory {
 
-    FormatWriter create(FileIO fileIO, Path path, String compression) throws 
IOException;
+    private final RowType rowType;
+    private final CsvOptions options;
+
+    public CsvReaderFactory(RowType rowType, CsvOptions options) {
+        this.rowType = rowType;
+        this.options = options;
+    }
+
+    @Override
+    public FileRecordReader<InternalRow> createReader(Context context) throws 
IOException {
+        return new CsvFileReader(context, rowType, options);
+    }
 }
diff --git 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index 7af6f79b34..c35f5544ca 100644
--- 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++ 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -16,3 +16,4 @@
 org.apache.paimon.format.avro.AvroFileFormatFactory
 org.apache.paimon.format.orc.OrcFileFormatFactory
 org.apache.paimon.format.parquet.ParquetFileFormatFactory
+org.apache.paimon.format.csv.CsvFileFormatFactory
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
new file mode 100644
index 0000000000..c9c5f3dbb5
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CsvFileFormat}. */
+public class CsvFileFormatTest extends FormatReadWriteTest {
+
+    protected CsvFileFormatTest() {
+        super("csv");
+    }
+
+    @Override
+    protected FileFormat fileFormat() {
+        return new CsvFileFormatFactory().create(new FormatContext(new 
Options(), 1024, 1024));
+    }
+
+    @Test
+    public void testWhenUseHiveDefaultDelimiter() throws IOException {
+        Options options = new Options();
+        options.set(CsvOptions.FIELD_DELIMITER, "\001");
+        FileFormat format =
+                new CsvFileFormatFactory().create(new FormatContext(new 
Options(), 1024, 1024));
+        testSimpleTypesUtil(
+                format, new Path(new Path(parent.toUri()), UUID.randomUUID() + 
"." + formatType));
+        testFullTypesUtil(
+                format, new Path(new Path(parent.toUri()), UUID.randomUUID() + 
"." + formatType));
+    }
+
+    @Test
+    public void testCsvParsingWithEmptyFields() throws IOException {
+
+        // First row: ,25,"Software Engineer" (empty first field)
+        String csvLine = ",25,\"Software Engineer\"\n";
+        String[] fields = parse(csvLine);
+        assertThat(fields).isNotNull();
+        assertThat(fields[0] == null); // empty field becomes null
+        assertThat(fields[1]).isEqualTo("25");
+        assertThat(fields[2]).isEqualTo("Software Engineer");
+
+        // Second row: "John Doe",,"Developer" (empty middle field)
+        csvLine = "\"John Doe\",,\"Developer\"\n";
+        fields = parse(csvLine);
+        assertThat(fields).isNotNull();
+        assertThat(fields[0]).isEqualTo("John Doe");
+        assertThat(fields[1] == null); // empty field becomes null
+        assertThat(fields[2]).isEqualTo("Developer");
+
+        // Third row: "Jane Smith",30, (empty last field)
+        csvLine = "\"Jane Smith\",30,\n";
+        fields = parse(csvLine);
+        assertThat(fields).isNotNull();
+        assertThat(fields[0]).isEqualTo("Jane Smith");
+        assertThat(fields[1]).isEqualTo("30");
+        assertThat(fields[2] == null); // empty field becomes null
+    }
+
+    @Test
+    public void testJsonArrayQuotePreservation() throws Exception {
+        // Test that JSON arrays preserve quotes
+        String csvLine = "name,\"[1,2,3]\",age";
+        String[] fields = parse(csvLine);
+
+        assertThat(fields).hasSize(3);
+        assertThat(fields[0]).isEqualTo("name");
+        assertThat(fields[1]).isEqualTo("[1,2,3]"); // Quotes should be 
preserved
+        assertThat(fields[2]).isEqualTo("age");
+    }
+
+    @Test
+    public void testJsonObjectQuotePreservation() throws Exception {
+        // Test that JSON objects preserve quotes
+        String csvLine = "id,{\"key\":\"value\"},status";
+        String[] fields = parse(csvLine);
+
+        assertThat(fields).hasSize(3);
+        assertThat(fields[0]).isEqualTo("id");
+        assertThat(fields[1]).isEqualTo("{\"key\":\"value\"}"); // Quotes 
should be preserved
+        assertThat(fields[2]).isEqualTo("status");
+    }
+
+    @Test
+    public void testComplexJsonArrayQuotePreservation() throws Exception {
+        // Test complex JSON array with nested objects
+        String csvLine =
+                
"field1,\"[{\"\"name\"\":\"\"John\"\"},{\"\"name\"\":\"\"Jane\"\"}]\",field3";
+        String[] fields = parse(csvLine);
+
+        assertThat(fields).hasSize(3);
+        assertThat(fields[0]).isEqualTo("field1");
+        
assertThat(fields[1]).isEqualTo("[{\"name\":\"John\"},{\"name\":\"Jane\"}]");
+        assertThat(fields[2]).isEqualTo("field3");
+    }
+
+    @Test
+    public void testRegularQuotedFieldsRemoveQuotes() throws Exception {
+        // Test that regular quoted fields (not JSON) still remove quotes
+        String csvLine = "\"John,Doe\",\"25\",\"Engineer\"";
+        String[] fields = parse(csvLine);
+
+        assertThat(fields).hasSize(3);
+        assertThat(fields[0]).isEqualTo("John,Doe"); // Quotes removed for 
regular field
+        assertThat(fields[1]).isEqualTo("25"); // Quotes removed
+        assertThat(fields[2]).isEqualTo("Engineer"); // Quotes removed
+    }
+
+    @Test
+    public void testJsonWithWhitespace() throws Exception {
+        // Test JSON with leading whitespace after quote
+        String csvLine = "field1,\" [1,2,3]\",field3";
+        String[] fields = parse(csvLine);
+
+        assertThat(fields).hasSize(3);
+        assertThat(fields[0]).isEqualTo("field1");
+        assertThat(fields[1])
+                .isEqualTo(" [1,2,3]"); // Should preserve quotes due to [ 
after whitespace
+        assertThat(fields[2]).isEqualTo("field3");
+    }
+
+    @Override
+    protected RowType rowTypeForFullTypesTest() {
+        RowType.Builder builder =
+                RowType.builder()
+                        .field("id", DataTypes.INT().notNull())
+                        .field("name", DataTypes.STRING()) /* optional by 
default */
+                        .field("salary", DataTypes.DOUBLE().notNull())
+                        .field("boolean", DataTypes.BOOLEAN().nullable())
+                        .field("tinyint", DataTypes.TINYINT())
+                        .field("smallint", DataTypes.SMALLINT())
+                        .field("bigint", DataTypes.BIGINT())
+                        .field("timestamp", DataTypes.TIMESTAMP())
+                        .field("timestamp_3", DataTypes.TIMESTAMP(3))
+                        .field("timestamp_ltz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                        .field("timestamp_ltz_3", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+                        .field("date", DataTypes.DATE())
+                        .field("decimal", DataTypes.DECIMAL(2, 2))
+                        .field("decimal2", DataTypes.DECIMAL(38, 2))
+                        .field("decimal3", DataTypes.DECIMAL(10, 1));
+
+        RowType rowType = builder.build();
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            rowType = (RowType) rowType.notNull();
+        }
+
+        return rowType;
+    }
+
+    @Override
+    protected GenericRow expectedRowForFullTypesTest() {
+        List<Object> values =
+                Arrays.asList(
+                        1,
+                        fromString("name"),
+                        5.26D,
+                        true,
+                        (byte) 3,
+                        (short) 6,
+                        12304L,
+                        Timestamp.fromMicros(123123123),
+                        Timestamp.fromEpochMillis(123123123),
+                        Timestamp.fromMicros(123123123),
+                        Timestamp.fromEpochMillis(123123123),
+                        2456,
+                        Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2),
+                        Decimal.fromBigDecimal(new BigDecimal("12312455.22"), 
38, 2),
+                        Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10, 
1));
+        return GenericRow.of(values.toArray());
+    }
+
+    @Override
+    public boolean supportNestedReadPruning() {
+        return false;
+    }
+
+    @Override
+    public boolean supportDataFileWithoutExtension() {
+        return true;
+    }
+
+    private String[] parse(String csvLine) throws IOException {
+        CsvSchema schema =
+                CsvSchema.emptySchema()
+                        .withQuoteChar('\"')
+                        .withColumnSeparator(',')
+                        .withoutHeader()
+                        .withNullValue("null");
+        return CsvFileReader.parseCsvLineToArray(csvLine, schema);
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 199714d2fa..f78780b629 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -124,7 +124,6 @@ import static 
org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -1488,10 +1487,11 @@ public class HiveCatalog extends AbstractCatalog {
             @Nullable FormatTable.Format provider, Map<String, String> 
tableParameters) {
         Map<String, String> param = new HashMap<>();
         if (provider == FormatTable.Format.CSV) {
+            String delimiterKey = "field-delimiter";
             param.put(
                     FIELD_DELIM,
                     tableParameters.getOrDefault(
-                            FIELD_DELIMITER.key(), 
options.get(FIELD_DELIMITER)));
+                            delimiterKey, options.getString(delimiterKey, 
",")));
         }
         return param;
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index fdf520689b..cf04b32d90 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -39,7 +39,6 @@ import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
 import static org.apache.paimon.hive.HiveCatalog.HIVE_FIELD_DELIM_DEFAULT;
 import static org.apache.paimon.hive.HiveCatalog.isView;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 
 class HiveTableUtils {
 
@@ -78,7 +77,7 @@ class HiveTableUtils {
             } else {
                 format = Format.CSV;
                 options.set(
-                        FIELD_DELIMITER,
+                        "field-delimiter",
                         serdeInfo
                                 .getParameters()
                                 .getOrDefault(FIELD_DELIM, 
HIVE_FIELD_DELIM_DEFAULT));
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 53dc85eb44..c172f4d60c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.format.csv.CsvOptions;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.options.Options;
@@ -39,7 +40,6 @@ import 
org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
 import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
 import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
-import org.apache.paimon.table.FormatTableOptions;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.ExceptionUtils;
@@ -530,7 +530,7 @@ public class SparkCatalog extends SparkBaseCatalog
         Options options = Options.fromMap(formatTable.options());
         CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
         if (formatTable.format() == FormatTable.Format.CSV) {
-            options.set("sep", 
options.get(FormatTableOptions.FIELD_DELIMITER));
+            options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
             dsOptions = new CaseInsensitiveStringMap(options.toMap());
             return new PartitionedCSVTable(
                     ident.name(),

Reply via email to