This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c55afa064e [core] text formats support compress (#6103)
c55afa064e is described below

commit c55afa064e9ffcdec40ca4a2c1f316cf70649fc1
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Sat Aug 23 13:18:13 2025 +0800

    [core] text formats support compress (#6103)
---
 .../org/apache/paimon/format/CompressionType.java  | 137 +++++++++++++
 .../apache/paimon/format/FormatReadWriteTest.java  |  10 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  |  30 ++-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  20 ++
 .../apache/paimon/format/BaseTextFileReader.java   |  23 ++-
 .../apache/paimon/format/BaseTextFileWriter.java   |  35 +++-
 .../org/apache/paimon/format/TextCompression.java  | 137 +++++++++++++
 .../apache/paimon/format/csv/CsvFileFormat.java    |  19 +-
 .../apache/paimon/format/csv/CsvFileReader.java    |  23 ++-
 .../apache/paimon/format/csv/CsvFormatWriter.java  |  36 ++--
 .../apache/paimon/format/csv/CsvReaderFactory.java |   5 +-
 .../apache/paimon/format/json/JsonFileFormat.java  |  21 +-
 .../apache/paimon/format/json/JsonFileReader.java  |  16 +-
 .../paimon/format/json/JsonFormatWriter.java       |  12 +-
 .../paimon/format/json/JsonReaderFactory.java      |   5 +-
 .../apache/paimon/format/BaseCompressionTest.java  | 221 +++++++++++++++++++++
 .../paimon/format/csv/CsvCompressionTest.java      |  54 +++++
 .../paimon/format/csv/CsvFileFormatTest.java       |  11 +-
 .../paimon/format/json/JsonCompressionTest.java    |  64 ++++++
 .../paimon/format/json/JsonFileFormatTest.java     |  11 +-
 .../org/apache/paimon/spark/SparkWriteITCase.java  |   8 +-
 21 files changed, 821 insertions(+), 77 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java 
b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java
new file mode 100644
index 0000000000..236db62fb7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/format/CompressionType.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** Compression types supported by Paimon file formats. */
+public enum CompressionType implements DescribedEnum {
+    NONE("none", "No compression.", null, ""),
+    GZIP(
+            "gzip",
+            "GZIP compression using the deflate algorithm.",
+            "org.apache.hadoop.io.compress.GzipCodec",
+            "gz"),
+    BZIP2(
+            "bzip2",
+            "BZIP2 compression using the Burrows-Wheeler algorithm.",
+            "org.apache.hadoop.io.compress.BZip2Codec",
+            "bz2"),
+    DEFLATE(
+            "deflate",
+            "DEFLATE compression using the deflate algorithm.",
+            "org.apache.hadoop.io.compress.DeflateCodec",
+            "deflate"),
+    SNAPPY(
+            "snappy",
+            "Snappy compression for fast compression and decompression.",
+            "org.apache.hadoop.io.compress.SnappyCodec",
+            "snappy"),
+    LZ4(
+            "lz4",
+            "LZ4 compression for very fast compression and decompression.",
+            "org.apache.hadoop.io.compress.Lz4Codec",
+            "lz4"),
+    ZSTD(
+            "zstd",
+            "Zstandard compression for high compression ratio and speed.",
+            "org.apache.hadoop.io.compress.ZStandardCodec",
+            "zst");
+
+    private final String value;
+    private final String className;
+    private final String fileExtension;
+    private final String description;
+
+    private static final Set<String> SUPPORTED_EXTENSIONS;
+
+    static {
+        Set<String> extensions = new HashSet<>();
+        for (CompressionType type : CompressionType.values()) {
+            if (type != CompressionType.NONE
+                    && type.fileExtension() != null
+                    && !type.fileExtension().isEmpty()) {
+                extensions.add(type.fileExtension().toLowerCase());
+            }
+        }
+        SUPPORTED_EXTENSIONS = Collections.unmodifiableSet(extensions);
+    }
+
+    CompressionType(String value, String description, String className, String 
fileExtension) {
+        this.value = value;
+        this.description = description;
+        this.className = className;
+        this.fileExtension = fileExtension;
+    }
+
+    @Override
+    public String toString() {
+        return value;
+    }
+
+    @Override
+    public InlineElement getDescription() {
+        return text(description);
+    }
+
+    public String value() {
+        return value;
+    }
+
+    public String hadoopCodecClassName() {
+        return className;
+    }
+
+    public String fileExtension() {
+        return fileExtension;
+    }
+
+    public static CompressionType fromValue(String value) {
+        if (value == null || value.isEmpty()) {
+            return NONE;
+        }
+
+        for (CompressionType type : CompressionType.values()) {
+            if (type.value.equalsIgnoreCase(value)) {
+                return type;
+            }
+        }
+        return NONE;
+    }
+
+    /**
+     * Check if the given extension is a supported compression extension.
+     *
+     * @param extension the file extension to check
+     * @return true if the extension is a supported compression extension, 
false otherwise
+     */
+    public static boolean isSupportedExtension(String extension) {
+        if (extension == null || extension.isEmpty()) {
+            return false;
+        }
+        return SUPPORTED_EXTENSIONS.contains(extension.toLowerCase());
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 40dcd975fc..5e1f186c85 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -140,6 +140,10 @@ public abstract class FormatReadWriteTest {
         return true;
     }
 
+    public String compression() {
+        return "zstd";
+    }
+
     @Test
     public void testNestedReadPruning() throws Exception {
         if (!supportNestedReadPruning()) {
@@ -250,10 +254,10 @@ public abstract class FormatReadWriteTest {
         FormatWriter writer;
         PositionOutputStream out = null;
         if (factory instanceof SupportsDirectWrite) {
-            writer = ((SupportsDirectWrite) factory).create(fileIO, file, 
"zstd");
+            writer = ((SupportsDirectWrite) factory).create(fileIO, file, 
this.compression());
         } else {
             out = fileIO.newOutputStream(file, false);
-            writer = factory.create(out, "zstd");
+            writer = factory.create(out, this.compression());
         }
         for (InternalRow row : rows) {
             writer.addElement(row);
@@ -381,7 +385,7 @@ public abstract class FormatReadWriteTest {
         Path filePath = new Path(parent, UUID.randomUUID().toString());
         FormatWriterFactory writerFactory = 
jsonFormat.createWriterFactory(rowType);
         try (FormatWriter writer =
-                writerFactory.create(fileIO.newOutputStream(filePath, false), 
"none")) {
+                writerFactory.create(fileIO.newOutputStream(filePath, false), 
compression())) {
             for (InternalRow row : testData) {
                 writer.addElement(row);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index e8193866db..933825271f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.fs.ExternalPathProvider;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileEntry;
@@ -88,8 +89,14 @@ public class DataFilePathFactory {
 
     private String newFileName(String prefix) {
         String extension;
-        if (fileSuffixIncludeCompression) {
-            extension = "." + fileCompression + "." + formatIdentifier;
+        if (isTextFormat(formatIdentifier)) {
+            String compressionExtension =
+                    CompressionType.fromValue(fileCompression).fileExtension();
+            extension = "." + formatIdentifier + "." + compressionExtension;
+        } else if (fileSuffixIncludeCompression) {
+            String compressionExtension =
+                    CompressionType.fromValue(fileCompression).fileExtension();
+            extension = "." + compressionExtension + "." + formatIdentifier;
         } else {
             extension = "." + formatIdentifier;
         }
@@ -162,7 +169,19 @@ public class DataFilePathFactory {
             throw new IllegalArgumentException(fileName + " is not a legal 
file name.");
         }
 
-        return fileName.substring(index + 1);
+        String extension = fileName.substring(index + 1);
+        if (CompressionType.isSupportedExtension(extension)) {
+            int secondLastDot = fileName.lastIndexOf('.', index - 1);
+            if (secondLastDot != -1) {
+                String formatIdentifier = fileName.substring(secondLastDot + 
1, index);
+                // If the format is json or csv, return that instead of the 
compression extension
+                if (isTextFormat(formatIdentifier)) {
+                    return formatIdentifier;
+                }
+            }
+        }
+
+        return extension;
     }
 
     public boolean isExternalPath() {
@@ -173,4 +192,9 @@ public class DataFilePathFactory {
     String uuid() {
         return uuid;
     }
+
+    private static boolean isTextFormat(String formatIdentifier) {
+        return "json".equalsIgnoreCase(formatIdentifier)
+                || "csv".equalsIgnoreCase(formatIdentifier);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index f61f1fa1d7..0c24824ef8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -68,6 +68,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         sql("CREATE TABLE CSV (a INT, b INT, c INT) WITH 
('file.format'='csv')");
         sql("INSERT INTO CSV VALUES (1, 2, 3)");
         assertThat(sql("SELECT * FROM CSV")).containsExactly(Row.of(1, 2, 3));
+
+        sql(
+                "CREATE TABLE CSV_GZIP (a INT, b INT, c INT) WITH 
('file.format'='csv', 'file.compression'='gzip')");
+        sql("INSERT INTO CSV_GZIP VALUES (1, 2, 3)");
+        assertThat(sql("SELECT * FROM CSV_GZIP")).containsExactly(Row.of(1, 2, 
3));
+        List<String> files =
+                sql("select file_path from `CSV_GZIP$files`").stream()
+                        .map(r -> r.getField(0).toString())
+                        .collect(Collectors.toList());
+        assertThat(files).allMatch(file -> file.endsWith("csv.gz"));
     }
 
     @Test
@@ -75,6 +85,16 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
         sql("CREATE TABLE JSON_T (a INT, b INT, c INT) WITH 
('file.format'='json')");
         sql("INSERT INTO JSON_T VALUES (1, 2, 3)");
         assertThat(sql("SELECT * FROM JSON_T")).containsExactly(Row.of(1, 2, 
3));
+
+        sql(
+                "CREATE TABLE JSON_GZIP (a INT, b INT, c INT) WITH 
('file.format'='json', 'file.compression'='gzip')");
+        sql("INSERT INTO JSON_GZIP VALUES (1, 2, 3)");
+        assertThat(sql("SELECT * FROM JSON_GZIP")).containsExactly(Row.of(1, 
2, 3));
+        List<String> files =
+                sql("select file_path from `JSON_GZIP$files`").stream()
+                        .map(r -> r.getField(0).toString())
+                        .collect(Collectors.toList());
+        assertThat(files).allMatch(file -> file.endsWith("json.gz"));
     }
 
     @Test
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
index 3cf2ec92cd..033f9bf65d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.RowType;
@@ -29,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 
@@ -37,17 +39,21 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
 
     protected final Path filePath;
     protected final RowType rowType;
+    protected final InputStream decompressedStream;
     protected final BufferedReader bufferedReader;
     protected boolean readerClosed = false;
     protected BaseTextRecordIterator reader;
 
-    protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType 
rowType) throws IOException {
+    protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType 
rowType, Options options)
+            throws IOException {
         this.filePath = filePath;
         this.rowType = rowType;
+        this.decompressedStream =
+                TextCompression.createDecompressedInputStream(
+                        fileIO.newInputStream(filePath), filePath, options);
         this.bufferedReader =
                 new BufferedReader(
-                        new InputStreamReader(
-                                fileIO.newInputStream(filePath), 
StandardCharsets.UTF_8));
+                        new InputStreamReader(this.decompressedStream, 
StandardCharsets.UTF_8));
         this.reader = createRecordIterator();
     }
 
@@ -89,8 +95,15 @@ public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow
 
     @Override
     public void close() throws IOException {
-        if (!readerClosed && bufferedReader != null) {
-            bufferedReader.close();
+        if (!readerClosed) {
+            // Close the buffered reader first
+            if (bufferedReader != null) {
+                bufferedReader.close();
+            }
+            // Explicitly close the decompressed stream to prevent resource 
leaks
+            if (decompressedStream != null) {
+                decompressedStream.close();
+            }
             readerClosed = true;
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
index 6be2123ed3..f6534afd61 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
@@ -20,10 +20,12 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 
@@ -34,11 +36,21 @@ public abstract class BaseTextFileWriter implements 
FormatWriter {
     protected final BufferedWriter writer;
     protected final RowType rowType;
 
-    protected BaseTextFileWriter(PositionOutputStream outputStream, RowType 
rowType) {
+    protected BaseTextFileWriter(
+            PositionOutputStream outputStream,
+            RowType rowType,
+            Options formatOptions,
+            CompressionType compressionType)
+            throws IOException {
         this.outputStream = outputStream;
-        this.rowType = rowType;
+        OutputStream compressedStream =
+                TextCompression.createCompressedOutputStream(
+                        outputStream, compressionType, formatOptions);
         this.writer =
-                new BufferedWriter(new OutputStreamWriter(outputStream, 
StandardCharsets.UTF_8));
+                new BufferedWriter(
+                        new OutputStreamWriter(compressedStream, 
StandardCharsets.UTF_8),
+                        getOptimalBufferSize(compressionType));
+        this.rowType = rowType;
     }
 
     /**
@@ -61,4 +73,21 @@ public abstract class BaseTextFileWriter implements 
FormatWriter {
         }
         return false;
     }
+
+    private int getOptimalBufferSize(CompressionType compressionType) {
+        switch (compressionType) {
+            case GZIP:
+            case DEFLATE:
+                return 65536; // 64KB for deflate-based compression
+            case SNAPPY:
+            case LZ4:
+                return 131072; // 128KB for fast compression
+            case ZSTD:
+                return 262144; // 256KB for high compression ratio
+            case BZIP2:
+                return 65536; // 64KB for bzip2
+            default:
+                return 65536; // Default 64KB buffer size
+        }
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java 
b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java
new file mode 100644
index 0000000000..4994b2448a
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
+
+/** Utility class for handling text file compression and decompression using 
Hadoop codecs. */
+public class TextCompression {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TextCompression.class);
+    /**
+     * Creates a compressed output stream using Hadoop's compression codecs.
+     *
+     * @param out The underlying output stream
+     * @param compression The compression format
+     * @param options Paimon options for Hadoop configuration
+     * @return Compressed output stream
+     * @throws IOException If compression stream creation fails
+     */
+    public static OutputStream createCompressedOutputStream(
+            PositionOutputStream out, CompressionType compression, Options 
options)
+            throws IOException {
+        Optional<CompressionCodec> codecOpt =
+                getCompressionCodecByCompression(compression, options);
+        if (codecOpt.isPresent()) {
+            return codecOpt.get().createOutputStream(out);
+        }
+        return out;
+    }
+
+    /**
+     * Creates a decompressed input stream using Hadoop's compression codecs.
+     *
+     * @param inputStream The underlying input stream
+     * @param filePath The file path (used to detect compression from 
extension)
+     * @param options Paimon options for Hadoop configuration
+     * @return Decompressed input stream
+     */
+    public static InputStream createDecompressedInputStream(
+            SeekableInputStream inputStream, Path filePath, Options options) 
throws IOException {
+        try {
+            Configuration conf = HadoopUtils.getHadoopConfiguration(options);
+            CompressionCodecFactory codecFactory = new 
CompressionCodecFactory(conf);
+
+            Optional<CompressionCodec> codecOpt =
+                    Optional.ofNullable(
+                            codecFactory.getCodec(
+                                    new 
org.apache.hadoop.fs.Path(filePath.toString())));
+            if (codecOpt.isPresent()) {
+                return codecOpt.get().createInputStream(inputStream);
+            }
+            return inputStream;
+        } catch (Throwable e) {
+            throw new IOException("Failed to create decompression stream", e);
+        }
+    }
+
+    public static CompressionType getTextCompressionType(String compression, 
Options options) {
+        CompressionType compressionType = 
CompressionType.fromValue(compression);
+        Optional<CompressionCodec> codecOpt =
+                getCompressionCodecByCompression(compressionType, options);
+        if (codecOpt.isPresent()) {
+            return CompressionType.fromValue(compression);
+        }
+        return CompressionType.NONE;
+    }
+
+    /**
+     * Gets a compression codec by compression type.
+     *
+     * @param compressionType The compression type
+     * @param options Paimon options for Hadoop configuration
+     * @return Optional CompressionCodec instance
+     */
+    public static Optional<CompressionCodec> getCompressionCodecByCompression(
+            CompressionType compressionType, Options options) {
+        if (compressionType == null || CompressionType.NONE == 
compressionType) {
+            return Optional.empty();
+        }
+
+        try {
+            Configuration conf = HadoopUtils.getHadoopConfiguration(options);
+            String codecName = compressionType.hadoopCodecClassName();
+            if (codecName != null) {
+                Class<?> codecClass = Class.forName(codecName);
+                if (CompressionCodec.class.isAssignableFrom(codecClass)) {
+                    CompressionCodec codec =
+                            (CompressionCodec) 
codecClass.getDeclaredConstructor().newInstance();
+                    if (codec instanceof org.apache.hadoop.conf.Configurable) {
+                        ((org.apache.hadoop.conf.Configurable) 
codec).setConf(conf);
+                    }
+                    // Test if the codec is actually usable by creating a test 
stream
+                    try {
+                        codec.createOutputStream(new 
java.io.ByteArrayOutputStream());
+                        return Optional.of(codec);
+                    } catch (Throwable e) {
+                        LOG.warn("Failed to create compression, so use none", 
e);
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            LOG.warn("Failed to create compression, so use none", e);
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 6dce1b470a..59acbdffa6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -19,11 +19,13 @@
 package org.apache.paimon.format.csv;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.TextCompression;
 import org.apache.paimon.fs.CloseShieldOutputStream;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.Options;
@@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
 
 /** CSV {@link FileFormat}. */
@@ -55,12 +58,12 @@ public class CsvFileFormat extends FileFormat {
     @Override
     public FormatReaderFactory createReaderFactory(
             RowType projectedRowType, @Nullable List<Predicate> filters) {
-        return new CsvReaderFactory(projectedRowType, new CsvOptions(options));
+        return new CsvReaderFactory(projectedRowType, options);
     }
 
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
-        return new CsvWriterFactory(type, new CsvOptions(options));
+        return new CsvWriterFactory(type, options);
     }
 
     @Override
@@ -103,16 +106,20 @@ public class CsvFileFormat extends FileFormat {
     private static class CsvWriterFactory implements FormatWriterFactory {
 
         private final RowType rowType;
-        private final CsvOptions options;
+        private final Options options;
 
-        public CsvWriterFactory(RowType rowType, CsvOptions options) {
+        public CsvWriterFactory(RowType rowType, Options options) {
             this.rowType = rowType;
             this.options = options;
         }
 
         @Override
-        public FormatWriter create(PositionOutputStream out, String 
compression) {
-            return new CsvFormatWriter(new CloseShieldOutputStream(out), 
rowType, options);
+        public FormatWriter create(PositionOutputStream out, String 
compression)
+                throws IOException {
+            CompressionType compressionType =
+                    TextCompression.getTextCompressionType(compression, 
options);
+            return new CsvFormatWriter(
+                    new CloseShieldOutputStream(out), rowType, options, 
compressionType);
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index c85ab8208e..dbb174528a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BaseTextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
@@ -49,20 +50,20 @@ public class CsvFileReader extends BaseTextFileReader {
     private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
             new ConcurrentHashMap<>(32);
 
-    private final CsvOptions options;
+    private final CsvOptions formatOptions;
     private final CsvSchema schema;
     private boolean headerSkipped = false;
 
-    public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, 
CsvOptions options)
+    public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, 
Options options)
             throws IOException {
-        super(fileIO, filePath, rowType);
-        this.options = options;
+        super(fileIO, filePath, rowType, options);
+        this.formatOptions = new CsvOptions(options);
         this.schema =
                 CsvSchema.emptySchema()
-                        .withQuoteChar(options.quoteCharacter().charAt(0))
-                        
.withColumnSeparator(options.fieldDelimiter().charAt(0))
-                        .withEscapeChar(options.escapeCharacter().charAt(0));
-        if (!options.includeHeader()) {
+                        
.withQuoteChar(formatOptions.quoteCharacter().charAt(0))
+                        
.withColumnSeparator(formatOptions.fieldDelimiter().charAt(0))
+                        
.withEscapeChar(formatOptions.escapeCharacter().charAt(0));
+        if (!formatOptions.includeHeader()) {
             this.schema.withoutHeader();
         }
     }
@@ -80,7 +81,7 @@ public class CsvFileReader extends BaseTextFileReader {
     @Override
     protected void setupReading() throws IOException {
         // Skip header if needed
-        if (options.includeHeader() && !headerSkipped) {
+        if (formatOptions.includeHeader() && !headerSkipped) {
             bufferedReader.readLine();
             headerSkipped = true;
         }
@@ -108,7 +109,7 @@ public class CsvFileReader extends BaseTextFileReader {
             String field = fields[i];
 
             // Fast path for null values
-            if (field == null || field.equals(options.nullLiteral()) || 
field.isEmpty()) {
+            if (field == null || field.equals(formatOptions.nullLiteral()) || 
field.isEmpty()) {
                 values[i] = null;
                 continue;
             }
@@ -122,7 +123,7 @@ public class CsvFileReader extends BaseTextFileReader {
 
     /** Optimized field parsing with caching and fast paths for common types. 
*/
     private Object parseFieldOptimized(String field, DataType dataType) {
-        if (field == null || field.equals(options.nullLiteral())) {
+        if (field == null || field.equals(formatOptions.nullLiteral())) {
             return null;
         }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index 754bb5a192..87d1abb5d2 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -22,7 +22,9 @@ import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
@@ -40,20 +42,22 @@ public class CsvFormatWriter extends BaseTextFileWriter {
     private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
             new ConcurrentHashMap<>(32);
 
-    private final CsvOptions options;
+    private final CsvOptions csvOptions;
     private boolean headerWritten = false;
     private final StringBuilder stringBuilder;
 
-    public CsvFormatWriter(PositionOutputStream out, RowType rowType, 
CsvOptions options) {
-        super(out, rowType);
-        this.options = options;
+    public CsvFormatWriter(
+            PositionOutputStream out, RowType rowType, Options options, 
CompressionType compression)
+            throws IOException {
+        super(out, rowType, options, compression);
+        this.csvOptions = new CsvOptions(options);
         this.stringBuilder = new StringBuilder();
     }
 
     @Override
     public void addElement(InternalRow element) throws IOException {
         // Write header if needed
-        if (options.includeHeader() && !headerWritten) {
+        if (csvOptions.includeHeader() && !headerWritten) {
             writeHeader();
             headerWritten = true;
         }
@@ -64,7 +68,7 @@ public class CsvFormatWriter extends BaseTextFileWriter {
         int fieldCount = rowType.getFieldCount();
         for (int i = 0; i < fieldCount; i++) {
             if (i > 0) {
-                stringBuilder.append(options.fieldDelimiter());
+                stringBuilder.append(csvOptions.fieldDelimiter());
             }
 
             Object value =
@@ -72,7 +76,7 @@ public class CsvFormatWriter extends BaseTextFileWriter {
             String fieldValue = escapeField(castToStringOptimized(value, 
rowType.getTypeAt(i)));
             stringBuilder.append(fieldValue);
         }
-        stringBuilder.append(options.lineDelimiter());
+        stringBuilder.append(csvOptions.lineDelimiter());
 
         writer.write(stringBuilder.toString());
     }
@@ -83,24 +87,24 @@ public class CsvFormatWriter extends BaseTextFileWriter {
         int fieldCount = rowType.getFieldCount();
         for (int i = 0; i < fieldCount; i++) {
             if (i > 0) {
-                stringBuilder.append(options.fieldDelimiter());
+                stringBuilder.append(csvOptions.fieldDelimiter());
             }
             stringBuilder.append(escapeField(rowType.getFieldNames().get(i)));
         }
-        stringBuilder.append(options.lineDelimiter());
+        stringBuilder.append(csvOptions.lineDelimiter());
         writer.write(stringBuilder.toString());
     }
 
     private String escapeField(String field) {
         if (field == null) {
-            return options.nullLiteral();
+            return csvOptions.nullLiteral();
         }
 
         // Optimized escaping with early exit checks
         boolean needsQuoting =
-                field.indexOf(options.fieldDelimiter().charAt(0)) >= 0
-                        || field.indexOf(options.lineDelimiter().charAt(0)) >= 0
-                        || field.indexOf(options.quoteCharacter().charAt(0)) 
>= 0;
+                field.indexOf(csvOptions.fieldDelimiter().charAt(0)) >= 0
+                        || field.indexOf(csvOptions.lineDelimiter().charAt(0)) 
>= 0
+                        || 
field.indexOf(csvOptions.quoteCharacter().charAt(0)) >= 0;
 
         if (!needsQuoting) {
             return field;
@@ -109,9 +113,9 @@ public class CsvFormatWriter extends BaseTextFileWriter {
         // Only escape if needed
         String escaped =
                 field.replace(
-                        options.quoteCharacter(),
-                        options.escapeCharacter() + options.quoteCharacter());
-        return options.quoteCharacter() + escaped + options.quoteCharacter();
+                        csvOptions.quoteCharacter(),
+                        csvOptions.escapeCharacter() + 
csvOptions.quoteCharacter());
+        return csvOptions.quoteCharacter() + escaped + 
csvOptions.quoteCharacter();
     }
 
     /** Optimized string casting with caching and fast paths for common types. 
*/
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
index cb26e3faf8..23fcf09fd6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.csv;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.RowType;
 
@@ -29,9 +30,9 @@ import java.io.IOException;
 public class CsvReaderFactory implements FormatReaderFactory {
 
     private final RowType rowType;
-    private final CsvOptions options;
+    private final Options options;
 
-    public CsvReaderFactory(RowType rowType, CsvOptions options) {
+    public CsvReaderFactory(RowType rowType, Options options) {
         this.rowType = rowType;
         this.options = options;
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
index 49790fefc2..54ccf59602 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -19,11 +19,13 @@
 package org.apache.paimon.format.json;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.TextCompression;
 import org.apache.paimon.fs.CloseShieldOutputStream;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.Options;
@@ -34,6 +36,7 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
 
 /** JSON {@link FileFormat}. */
@@ -45,18 +48,18 @@ public class JsonFileFormat extends FileFormat {
 
     public JsonFileFormat(FormatContext context) {
         super(IDENTIFIER);
-        this.options = getIdentifierPrefixOptions(context.options());
+        this.options = context.options();
     }
 
     @Override
     public FormatReaderFactory createReaderFactory(
             RowType projectedRowType, @Nullable List<Predicate> filters) {
-        return new JsonReaderFactory(projectedRowType, new 
JsonOptions(options));
+        return new JsonReaderFactory(projectedRowType, options);
     }
 
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
-        return new JsonWriterFactory(type, new JsonOptions(options));
+        return new JsonWriterFactory(type, options);
     }
 
     @Override
@@ -102,16 +105,20 @@ public class JsonFileFormat extends FileFormat {
     private static class JsonWriterFactory implements FormatWriterFactory {
 
         private final RowType rowType;
-        private final JsonOptions options;
+        private final Options options;
 
-        public JsonWriterFactory(RowType rowType, JsonOptions options) {
+        public JsonWriterFactory(RowType rowType, Options options) {
             this.rowType = rowType;
             this.options = options;
         }
 
         @Override
-        public FormatWriter create(PositionOutputStream out, String 
compression) {
-            return new JsonFormatWriter(new CloseShieldOutputStream(out), 
rowType, options);
+        public FormatWriter create(PositionOutputStream out, String 
compression)
+                throws IOException {
+            CompressionType compressionType =
+                    TextCompression.getTextCompressionType(compression, 
options);
+            return new JsonFormatWriter(
+                    new CloseShieldOutputStream(out), rowType, options, 
compressionType);
         }
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index 96d5ecfa5d..b46f92835e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BaseTextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -53,10 +54,10 @@ public class JsonFileReader extends BaseTextFileReader {
 
     private final JsonOptions options;
 
-    public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, 
JsonOptions options)
+    public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, 
Options options)
             throws IOException {
-        super(fileIO, filePath, rowType);
-        this.options = options;
+        super(fileIO, filePath, rowType, options);
+        this.options = new JsonOptions(options);
     }
 
     @Override
@@ -67,7 +68,8 @@ public class JsonFileReader extends BaseTextFileReader {
     @Override
     protected InternalRow parseLine(String line) throws IOException {
         try {
-            return convertJsonStringToRow(line, rowType, options);
+            JsonNode jsonNode = 
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
+            return (InternalRow) convertJsonValue(jsonNode, rowType, options);
         } catch (JsonProcessingException e) {
             if (options.ignoreParseErrors()) {
                 return null;
@@ -88,12 +90,6 @@ public class JsonFileReader extends BaseTextFileReader {
         // No additional JSON-specific iterator logic needed
     }
 
-    private InternalRow convertJsonStringToRow(String line, RowType rowType, 
JsonOptions options)
-            throws JsonProcessingException {
-        JsonNode jsonNode = 
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
-        return (InternalRow) convertJsonValue(jsonNode, rowType, options);
-    }
-
     private Object convertJsonValue(JsonNode node, DataType dataType, 
JsonOptions options) {
         if (node == null || node.isNull()) {
             return null;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index de6b1a70f9..414a7eb0fd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -25,7 +25,9 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -52,9 +54,13 @@ public class JsonFormatWriter extends BaseTextFileWriter {
     private final char lineDelimiter;
 
     public JsonFormatWriter(
-            PositionOutputStream outputStream, RowType rowType, JsonOptions 
options) {
-        super(outputStream, rowType);
-        this.lineDelimiter = options.getLineDelimiter().charAt(0);
+            PositionOutputStream outputStream,
+            RowType rowType,
+            Options options,
+            CompressionType compressionType)
+            throws IOException {
+        super(outputStream, rowType, options, compressionType);
+        this.lineDelimiter = (new 
JsonOptions(options)).getLineDelimiter().charAt(0);
     }
 
     @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
index b82d77d948..709f2d345a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.RowType;
 
@@ -31,9 +32,9 @@ import java.io.IOException;
 public class JsonReaderFactory implements FormatReaderFactory {
 
     private final RowType projectedRowType;
-    private final JsonOptions options;
+    private final Options options;
 
-    public JsonReaderFactory(RowType projectedRowType, JsonOptions options) {
+    public JsonReaderFactory(RowType projectedRowType, Options options) {
         this.projectedRowType = projectedRowType;
         this.options = options;
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java 
b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java
new file mode 100644
index 0000000000..7f616597d8
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/BaseCompressionTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Base class for compression tests across different file formats. */
+public abstract class BaseCompressionTest {
+
+    @TempDir protected java.nio.file.Path tempDir;
+
+    protected final RowType rowType =
+            RowType.of(
+                    DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE(), 
DataTypes.BOOLEAN());
+
+    protected final List<InternalRow> testData =
+            Arrays.asList(
+                    GenericRow.of(1, BinaryString.fromString("Alice"), 100.5, 
true),
+                    GenericRow.of(2, BinaryString.fromString("Bob"), 200.75, 
false),
+                    GenericRow.of(3, BinaryString.fromString("Charlie"), 
300.25, true),
+                    GenericRow.of(4, BinaryString.fromString("Diana"), 400.0, 
false));
+
+    private List<CompressionType> compressionTypes =
+            Arrays.asList(
+                    CompressionType.NONE,
+                    CompressionType.GZIP,
+                    CompressionType.BZIP2,
+                    CompressionType.DEFLATE,
+                    CompressionType.ZSTD);
+
+    /** Returns the file format for testing. */
+    protected abstract FileFormat createFileFormat(Options options);
+
+    /** Returns the file extension for the format. */
+    protected abstract String getFormatExtension();
+
+    @Test
+    void testCompression() {
+        compressionTypes.forEach(
+                compression -> {
+                    try {
+                        testCompressionRoundTrip(
+                                compression.value(),
+                                String.format(
+                                        "test_compress.%s.%s",
+                                        getFormatExtension(), 
compression.fileExtension()));
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Test
+    void testCompressionInitFail() throws IOException {
+        CompressionType compression = CompressionType.SNAPPY;
+        assertThrows(
+                IOException.class,
+                () ->
+                        testCompressionRoundTrip(
+                                compression.value(),
+                                String.format(
+                                        "test_compress.%s.%s",
+                                        getFormatExtension(), 
compression.fileExtension())));
+    }
+
+    @Test
+    void testCompressionDetectionFromFileName() {
+        compressionTypes.forEach(
+                compression -> {
+                    try {
+                        testAutoCompressionDetection(
+                                "test_auto."
+                                        + getFormatExtension()
+                                        + "."
+                                        + compression.fileExtension(),
+                                compression.value());
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Test
+    void testUnsupportedCompressionFormat() throws IOException {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, "unsupported");
+        testCompressionRoundTripWithOptions(options, 
"test_file_unsupported_compress");
+    }
+
+    protected void testCompressionRoundTrip(String compression, String 
fileName)
+            throws IOException {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, compression);
+        testCompressionRoundTripWithOptions(options, fileName);
+    }
+
+    protected void testCompressionRoundTripWithOptions(Options options, String 
fileName)
+            throws IOException {
+        FileFormat format = createFileFormat(options);
+
+        // Validate the format and compression
+        format.validateDataFields(rowType);
+
+        Path filePath = new Path(tempDir.resolve(fileName).toString());
+        FileIO fileIO = new LocalFileIO();
+
+        // Write data with compression
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        try (FormatWriter writer =
+                writerFactory.create(
+                        fileIO.newOutputStream(filePath, false),
+                        options.get(CoreOptions.FILE_COMPRESSION))) {
+            for (InternalRow row : testData) {
+                writer.addElement(row);
+            }
+        }
+
+        // Read data back
+        FormatReaderFactory readerFactory = 
format.createReaderFactory(rowType, null);
+        List<InternalRow> readData = new ArrayList<>();
+
+        try (RecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, filePath, 
fileIO.getFileSize(filePath)))) {
+            reader.forEachRemaining(readData::add);
+        }
+
+        // Verify data integrity
+        assertThat(readData).hasSize(testData.size());
+        for (int i = 0; i < testData.size(); i++) {
+            InternalRow expected = testData.get(i);
+            InternalRow actual = readData.get(i);
+
+            assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+            
assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString());
+            assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2));
+            assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3));
+        }
+    }
+
+    protected void testAutoCompressionDetection(String fileName, String 
compression)
+            throws IOException {
+        // Write file with compression
+        Options writeOptions = new Options();
+        writeOptions.set(CoreOptions.FILE_COMPRESSION, compression);
+
+        FileFormat format = createFileFormat(writeOptions);
+        Path filePath = new Path(tempDir.resolve(fileName).toString());
+        FileIO fileIO = new LocalFileIO();
+
+        // Write compressed data
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        try (FormatWriter writer =
+                writerFactory.create(fileIO.newOutputStream(filePath, false), 
compression)) {
+            writer.addElement(testData.get(0)); // Write just one row for this 
test
+        }
+
+        // Read back with auto-detection (no compression specified in read 
options)
+        Options readOptions = new Options();
+        readOptions.set(CoreOptions.FILE_COMPRESSION, "none"); // Default to 
none
+
+        FileFormat readFormat = createFileFormat(readOptions);
+        FormatReaderFactory readerFactory = 
readFormat.createReaderFactory(rowType, null);
+
+        List<InternalRow> readData = new ArrayList<>();
+        try (RecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, filePath, 
fileIO.getFileSize(filePath)))) {
+            reader.forEachRemaining(readData::add);
+        }
+
+        // Should successfully read the data regardless of compression
+        assertThat(readData).hasSize(1);
+        InternalRow expected = testData.get(0);
+        InternalRow actual = readData.get(0);
+        assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0));
+        
assertThat(actual.getString(1).toString()).isEqualTo(expected.getString(1).toString());
+    }
+
+    protected FormatContext createFormatContext(Options options) {
+        return new FormatContext(options, 1024, 1024);
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
new file mode 100644
index 0000000000..159aa3715e
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvCompressionTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.BaseCompressionTest;
+import org.apache.paimon.format.CompressionType;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Test for CSV compression functionality. */
+class CsvCompressionTest extends BaseCompressionTest {
+
+    @Override
+    protected FileFormat createFileFormat(Options options) {
+        return new CsvFileFormat(createFormatContext(options));
+    }
+
+    @Override
+    protected String getFormatExtension() {
+        return "csv";
+    }
+
+    @Test
+    void testCompressionWithCustomOptions() throws IOException {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, 
CompressionType.GZIP.value());
+        options.set(CsvOptions.FIELD_DELIMITER, ";");
+        options.set(CsvOptions.INCLUDE_HEADER, true);
+
+        String fileName = "test_custom_options.csv.gz";
+        testCompressionRoundTripWithOptions(options, fileName);
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index bed689bd4d..2b12874588 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.paimon.format.csv;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReadWriteTest;
@@ -61,7 +63,9 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
 
     @Override
     protected FileFormat fileFormat() {
-        return new CsvFileFormatFactory().create(new FormatContext(new 
Options(), 1024, 1024));
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, compression());
+        return new CsvFileFormatFactory().create(new FormatContext(options, 
1024, 1024));
     }
 
     @Test
@@ -480,6 +484,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest 
{
         return false;
     }
 
+    @Override
+    public String compression() {
+        return CompressionType.NONE.value();
+    }
+
     @Override
     public boolean supportDataFileWithoutExtension() {
         return true;
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
new file mode 100644
index 0000000000..afd79fa878
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.format.BaseCompressionTest;
+import org.apache.paimon.format.CompressionType;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+/** Test for JSON compression functionality. */
+class JsonCompressionTest extends BaseCompressionTest {
+
+    @Override
+    protected FileFormat createFileFormat(Options options) {
+        return new JsonFileFormat(createFormatContext(options));
+    }
+
+    @Override
+    protected String getFormatExtension() {
+        return "json";
+    }
+
+    @Test
+    void testCompressionWithCustomJsonOptions() throws IOException {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, "gzip");
+        options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+        options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, 
JsonOptions.MapNullKeyMode.DROP);
+        options.set(JsonOptions.LINE_DELIMITER, "\n");
+
+        String fileName = "test_custom_json_options.json.gz";
+        testCompressionRoundTripWithOptions(options, fileName);
+    }
+
+    @Test
+    void testJsonCompressionWithComplexData() throws IOException {
+        // Test with complex JSON structures and different compression formats
+        testCompressionRoundTrip(CompressionType.GZIP.value(), 
"test_complex_gzip.json.gz");
+        testCompressionRoundTrip(
+                CompressionType.DEFLATE.value(), 
"test_complex_deflate.json.deflate");
+        testCompressionRoundTrip(CompressionType.NONE.value(), 
"test_complex_none.json");
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
index dd38623279..82f6d10a22 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.paimon.format.json;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.CompressionType;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatFactory;
 import org.apache.paimon.format.FormatReadWriteTest;
@@ -56,7 +58,14 @@ public class JsonFileFormatTest extends FormatReadWriteTest {
 
     @Override
     protected FileFormat fileFormat() {
-        return new JsonFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, compression());
+        return new JsonFileFormat(new FileFormatFactory.FormatContext(options, 
1024, 1024));
+    }
+
+    @Override
+    public String compression() {
+        return CompressionType.NONE.value();
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 51c2e63b5f..d889c1ac66 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -638,8 +638,8 @@ public class SparkWriteITCase {
         Assertions.assertEquals(4, files.size());
 
         String defaultExtension = "." + "parquet";
-        String newExtension = "." + "zstd" + "." + "parquet";
-        // two data files end with ".parquet", two data file end with 
".zstd.parquet"
+        String newExtension = "." + "zst" + "." + "parquet";
+        // two data files end with ".parquet", two data file end with 
".zst.parquet"
         Assertions.assertEquals(
                 2,
                 files.stream()
@@ -682,8 +682,8 @@ public class SparkWriteITCase {
                         .filter(name -> name.contains("changelog-"))
                         .collect(Collectors.toList());
         String defaultExtension = "." + "parquet";
-        String newExtension = "." + "zstd" + "." + "parquet";
-        // one changelog file end with ".parquet", one changelog file end with 
".zstd.parquet"
+        String newExtension = "." + "zst" + "." + "parquet";
+        // one changelog file end with ".parquet", one changelog file end with 
".zst.parquet"
         Assertions.assertEquals(
                 1,
                 files.stream()

Reply via email to