This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df06cd1da [format] Default compression of file to zstd with level 1 
(#3463)
df06cd1da is described below

commit df06cd1da931f07553581d5c54d201d6881bb394
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 4 11:02:16 2024 +0800

    [format] Default compression of file to zstd with level 1 (#3463)
---
 docs/content/append-table/append-table.md          |  4 +-
 docs/content/maintenance/write-performance.md      |  6 +-
 .../shortcodes/generated/core_configuration.html   | 34 ++++++++---
 .../shortcodes/generated/orc_configuration.html    | 12 ----
 .../main/java/org/apache/paimon/CoreOptions.java   | 28 +++++++++-
 .../java/org/apache/paimon/format/FileFormat.java  | 11 ++--
 .../apache/paimon/format/FileFormatFactory.java    | 27 +++++++++
 .../apache/paimon/format/FormatWriterFactory.java  |  4 +-
 .../apache/paimon/format/FormatReadWriteTest.java  |  4 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  5 +-
 .../apache/paimon/manifest/IndexManifestFile.java  | 11 +++-
 .../org/apache/paimon/manifest/ManifestFile.java   | 15 ++---
 .../org/apache/paimon/manifest/ManifestList.java   |  7 ++-
 .../apache/paimon/table/system/ManifestsTable.java |  9 ++-
 .../java/org/apache/paimon/utils/ObjectsFile.java  |  7 ++-
 .../java/org/apache/paimon/FileFormatTest.java     |  8 +--
 .../manifest/IndexManifestFileHandlerTest.java     |  2 +
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  1 +
 .../apache/paimon/manifest/ManifestFileTest.java   |  1 +
 .../apache/paimon/manifest/ManifestListTest.java   |  3 +-
 .../paimon/table/system/ManifestsTableTest.java    |  1 +
 .../apache/paimon/flink/CatalogTableITCase.java    | 65 ++++++++++------------
 .../paimon/flink/sink/CommitterOperatorTest.java   |  2 +-
 .../java/org/apache/paimon/format/OrcOptions.java  | 19 -------
 .../org/apache/paimon/format/avro/AvroBuilder.java |  3 +-
 .../apache/paimon/format/avro/AvroFileFormat.java  | 34 +++++++----
 .../paimon/format/avro/AvroFileFormatFactory.java  |  2 +-
 .../paimon/format/avro/AvroWriterFactory.java      |  6 +-
 .../apache/paimon/format/orc/OrcFileFormat.java    | 31 ++++++++---
 .../paimon/format/orc/OrcFileFormatFactory.java    | 20 +------
 .../apache/paimon/format/orc/OrcWriterFactory.java |  7 ++-
 .../paimon/format/parquet/ParquetFileFormat.java   | 30 +++++++---
 .../format/parquet/ParquetFileFormatFactory.java   | 26 +--------
 .../parquet/writer/RowDataParquetBuilder.java      | 13 +----
 .../paimon/format/avro/AvroFileFormatTest.java     |  7 ++-
 .../format/avro/AvroFormatReadWriteTest.java       |  3 +-
 .../paimon/format/orc/OrcFileFormatTest.java       |  1 -
 .../format/orc/writer/OrcBulkWriterTest.java       |  2 +-
 .../format/parquet/ParquetFileFormatTest.java      | 15 +----
 39 files changed, 266 insertions(+), 220 deletions(-)

diff --git a/docs/content/append-table/append-table.md 
b/docs/content/append-table/append-table.md
index 1e4dd4f5f..2eba1f44f 100644
--- a/docs/content/append-table/append-table.md
+++ b/docs/content/append-table/append-table.md
@@ -39,14 +39,12 @@ CREATE TABLE my_table (
     price DOUBLE,
     sales BIGINT
 ) WITH (
-    'file.compression' = 'zstd'
+    'file.compression.zstd-level' = '3'
 );
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-The recommended compression for the Append table is `'zstd'`.
-
 ## Automatic small file merging
 
 In streaming writing job, without bucket definition, there is no compaction in 
writer, instead, will use
diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index d69cdc1a9..8e6ff4830 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -210,11 +210,9 @@ layers to be in Avro format.
 
 ## File Compression
 
-By default, Paimon uses high-performance compression algorithms such as LZ4 
and SNAPPY, but their compression rates
-are not so good. If you want to reduce the write/read performance, you can 
modify the compression algorithm:
+By default, Paimon uses zstd with level 1, you can modify the compression 
algorithm:
 
-1. `'file.compression'`: Default file compression format. If you need a higher 
compression rate, I recommend using `'ZSTD'`.
-2. `'file.compression.per.level'`: Define different compression policies for 
different level. For example `'0:lz4,1:zstd'`.
+`'file.compression.zstd-level'`: Default zstd level is 1. For higher 
compression rates, it can be configured to 9, but the read and write speed will 
significantly decrease.
 
 ## Stability
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 01f62e759..fe91f2a0f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,18 +188,18 @@ under the License.
             <td>Boolean</td>
             <td>Force produce changelog in delete sql, or you can use 
'streaming-read-overwrite' to read changelog from overwrite commit.</td>
         </tr>
-        <tr>
-            <td><h5>deletion-vectors.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable deletion vectors mode. In this mode, index 
files containing deletion vectors are generated when data is written, which 
marks the data for deletion. During read operations, by applying these index 
files, merging can be avoided.</td>
-        </tr>
         <tr>
             <td><h5>deletion-vector.index-file.target-size</h5></td>
             <td style="word-wrap: break-word;">2 mb</td>
             <td>MemorySize</td>
             <td>The target size of deletion vector index file.</td>
         </tr>
+        <tr>
+            <td><h5>deletion-vectors.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable deletion vectors mode. In this mode, index 
files containing deletion vectors are generated when data is written, which 
marks the data for deletion. During read operations, by applying these index 
files, merging can be avoided.</td>
+        </tr>
         <tr>
             <td><h5>dynamic-bucket.assigner-parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -249,10 +249,16 @@ under the License.
             <td>The threshold for read file async.</td>
         </tr>
         <tr>
-            <td><h5>file.compression</h5></td>
+            <td><h5>file.block-size</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>File block size of format, default value of orc stripe is 64 
MB, and parquet row group is 128 MB.</td>
+        </tr>
+        <tr>
+            <td><h5>file.compression</h5></td>
+            <td style="word-wrap: break-word;">"zstd"</td>
             <td>String</td>
-            <td>Default file compression format, orc is lz4 and parquet is 
snappy. It can be overridden by file.compression.per.level</td>
+            <td>Default file compression. For faster read and write, it is 
recommended to use LZ4.</td>
         </tr>
         <tr>
             <td><h5>file.compression.per.level</h5></td>
@@ -260,6 +266,12 @@ under the License.
             <td>Map</td>
             <td>Define different compression policies for different level, you 
can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.</td>
         </tr>
+        <tr>
+            <td><h5>file.compression.zstd-level</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Default file compression zstd level. For higher compression 
rates, it can be configured to 9, but the read and write speed will 
significantly decrease.</td>
+        </tr>
         <tr>
             <td><h5>file.format</h5></td>
             <td style="word-wrap: break-word;">"orc"</td>
@@ -357,6 +369,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Float</td>
             <td>The index load factor for lookup.</td>
         </tr>
+        <tr>
+            <td><h5>manifest.compression</h5></td>
+            <td style="word-wrap: break-word;">"zstd"</td>
+            <td>String</td>
+            <td>Default file compression for manifest.</td>
+        </tr>
         <tr>
             <td><h5>manifest.format</h5></td>
             <td style="word-wrap: break-word;">"avro"</td>
diff --git a/docs/layouts/shortcodes/generated/orc_configuration.html 
b/docs/layouts/shortcodes/generated/orc_configuration.html
index ffdae855e..96c25f6f6 100644
--- a/docs/layouts/shortcodes/generated/orc_configuration.html
+++ b/docs/layouts/shortcodes/generated/orc_configuration.html
@@ -32,18 +32,6 @@ under the License.
             <td>Integer</td>
             <td>Comma-separated list of fields for which dictionary encoding 
is to be skipped in orc.</td>
         </tr>
-        <tr>
-            <td><h5>orc.compress</h5></td>
-            <td style="word-wrap: break-word;">"lz4"</td>
-            <td>String</td>
-            <td>Define the compression codec for ORC file, if a higher 
compression ratio is required, it is recommended to configure it as 'zstd', and 
you can configure: orc.compression.zstd.level</td>
-        </tr>
-        <tr>
-            <td><h5>orc.compression.zstd.level</h5></td>
-            <td style="word-wrap: break-word;">3</td>
-            <td>Integer</td>
-            <td>Define the compression level to use with ZStandard codec while 
writing data. The valid range is 1~22.</td>
-        </tr>
         <tr>
             <td><h5>orc.dictionary.key.threshold</h5></td>
             <td style="word-wrap: break-word;">0.8</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 78f8fc4f2..494e36092 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -150,10 +150,23 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<String> FILE_COMPRESSION =
             key("file.compression")
                     .stringType()
+                    .defaultValue("zstd")
+                    .withDescription(
+                            "Default file compression. For faster read and 
write, it is recommended to use LZ4.");
+
+    public static final ConfigOption<Integer> FILE_COMPRESSION_ZSTD_LEVEL =
+            key("file.compression.zstd-level")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Default file compression zstd level. For higher 
compression rates, it can be configured to 9, but the read and write speed will 
significantly decrease.");
+
+    public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
+            key("file.block-size")
+                    .memoryType()
                     .noDefaultValue()
                     .withDescription(
-                            "Default file compression format, orc is lz4 and 
parquet is snappy. It can be overridden by "
-                                    + FILE_COMPRESSION_PER_LEVEL.key());
+                            "File block size of format, default value of orc 
stripe is 64 MB, and parquet row group is 128 MB.");
 
     public static final ConfigOption<MemorySize> 
FILE_INDEX_IN_MANIFEST_THRESHOLD =
             key("file-index.in-manifest-threshold")
@@ -173,6 +186,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(CoreOptions.FILE_FORMAT_AVRO)
                     .withDescription("Specify the message format of manifest 
files.");
 
+    public static final ConfigOption<String> MANIFEST_COMPRESSION =
+            key("manifest.compression")
+                    .stringType()
+                    .defaultValue("zstd")
+                    .withDescription("Default file compression for manifest.");
+
     public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
             key("manifest.target-file-size")
                     .memoryType()
@@ -1227,6 +1246,10 @@ public class CoreOptions implements Serializable {
         return createFileFormat(options, MANIFEST_FORMAT);
     }
 
+    public String manifestCompression() {
+        return options.get(MANIFEST_COMPRESSION);
+    }
+
     public MemorySize manifestTargetSize() {
         return options.get(MANIFEST_TARGET_FILE_SIZE);
     }
@@ -1325,6 +1348,7 @@ public class CoreOptions implements Serializable {
                         .defaultValue(false));
     }
 
+    @Nullable
     public String fileCompression() {
         return options.get(FILE_COMPRESSION);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 39f3d8ff6..daf9e5494 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -104,9 +104,12 @@ public abstract class FileFormat {
     }
 
     public static FileFormat getFileFormat(Options options, String 
formatIdentifier) {
-        int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
-        return FileFormat.fromIdentifier(
-                formatIdentifier,
-                new FormatContext(options.removePrefix(formatIdentifier + 
"."), readBatchSize));
+        FormatContext context =
+                new FormatContext(
+                        options.removePrefix(formatIdentifier + "."),
+                        options.get(CoreOptions.READ_BATCH_SIZE),
+                        options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
+                        options.get(CoreOptions.FILE_BLOCK_SIZE));
+        return FileFormat.fromIdentifier(formatIdentifier, context);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
index 71e10a242..1bb681177 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
@@ -18,8 +18,12 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 
+import javax.annotation.Nullable;
+
 /** Factory to create {@link FileFormat}. */
 public interface FileFormatFactory {
 
@@ -29,12 +33,26 @@ public interface FileFormatFactory {
 
     /** the format context. */
     class FormatContext {
+
         private final Options formatOptions;
         private final int readBatchSize;
+        private final int zstdLevel;
+        @Nullable private final MemorySize blockSize;
 
+        @VisibleForTesting
         public FormatContext(Options formatOptions, int readBatchSize) {
+            this(formatOptions, readBatchSize, 1, null);
+        }
+
+        public FormatContext(
+                Options formatOptions,
+                int readBatchSize,
+                int zstdLevel,
+                @Nullable MemorySize blockSize) {
             this.formatOptions = formatOptions;
             this.readBatchSize = readBatchSize;
+            this.zstdLevel = zstdLevel;
+            this.blockSize = blockSize;
         }
 
         public Options formatOptions() {
@@ -44,5 +62,14 @@ public interface FileFormatFactory {
         public int readBatchSize() {
             return readBatchSize;
         }
+
+        public int zstdLevel() {
+            return zstdLevel;
+        }
+
+        @Nullable
+        public MemorySize blockSize() {
+            return blockSize;
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
index e53be8003..517cc00cb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
@@ -20,8 +20,6 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.fs.PositionOutputStream;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /** A factory to create {@link FormatWriter} for file. */
@@ -35,5 +33,5 @@ public interface FormatWriterFactory {
      * @throws IOException Thrown if the writer cannot be opened, or if the 
output stream throws an
      *     exception.
      */
-    FormatWriter create(PositionOutputStream out, @Nullable String 
compression) throws IOException;
+    FormatWriter create(PositionOutputStream out, String compression) throws 
IOException;
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 556f2f603..10467ecd0 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -87,7 +87,7 @@ public abstract class FormatReadWriteTest {
         FileFormat format = fileFormat();
 
         PositionOutputStream out = fileIO.newOutputStream(file, false);
-        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
null);
+        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
"zstd");
         writer.addElement(GenericRow.of(1, 1L));
         writer.addElement(GenericRow.of(2, 2L));
         writer.addElement(GenericRow.of(3, null));
@@ -118,7 +118,7 @@ public abstract class FormatReadWriteTest {
         FileFormat format = fileFormat();
 
         PositionOutputStream out = fileIO.newOutputStream(file, false);
-        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
null);
+        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
"zstd");
         writer.addElement(expected);
         writer.flush();
         writer.finish();
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index bfa0e8947..8d9eca295 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -118,6 +118,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 schemaManager,
                 partitionType,
                 options.manifestFormat(),
+                options.manifestCompression(),
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
                 forWrite ? writeManifestCache : null);
@@ -132,12 +133,14 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return new ManifestList.Factory(
                 fileIO,
                 options.manifestFormat(),
+                options.manifestCompression(),
                 pathFactory(),
                 forWrite ? writeManifestCache : null);
     }
 
     protected IndexManifestFile.Factory indexManifestFileFactory() {
-        return new IndexManifestFile.Factory(fileIO, options.manifestFormat(), 
pathFactory());
+        return new IndexManifestFile.Factory(
+                fileIO, options.manifestFormat(), 
options.manifestCompression(), pathFactory());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 8a91f3483..91a4f171a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -40,12 +40,14 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
             FileIO fileIO,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
+            String compression,
             PathFactory pathFactory) {
         super(
                 fileIO,
                 new IndexManifestEntrySerializer(),
                 readerFactory,
                 writerFactory,
+                compression,
                 pathFactory,
                 null);
     }
@@ -68,11 +70,17 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
 
         private final FileIO fileIO;
         private final FileFormat fileFormat;
+        private final String compression;
         private final FileStorePathFactory pathFactory;
 
-        public Factory(FileIO fileIO, FileFormat fileFormat, 
FileStorePathFactory pathFactory) {
+        public Factory(
+                FileIO fileIO,
+                FileFormat fileFormat,
+                String compression,
+                FileStorePathFactory pathFactory) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
+            this.compression = compression;
             this.pathFactory = pathFactory;
         }
 
@@ -82,6 +90,7 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                     fileIO,
                     fileFormat.createReaderFactory(schema),
                     fileFormat.createWriterFactory(schema),
+                    compression,
                     pathFactory.indexManifestFileFactory());
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 554d8fb88..e2fa86789 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.manifest;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -60,10 +59,11 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             ManifestEntrySerializer serializer,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
+            String compression,
             PathFactory pathFactory,
             long suggestedFileSize,
             @Nullable SegmentsCache<String> cache) {
-        super(fileIO, serializer, readerFactory, writerFactory, pathFactory, 
cache);
+        super(fileIO, serializer, readerFactory, writerFactory, compression, 
pathFactory, cache);
         this.schemaManager = schemaManager;
         this.partitionType = partitionType;
         this.writerFactory = writerFactory;
@@ -93,11 +93,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
 
     public RollingFileWriter<ManifestEntry, ManifestFileMeta> 
createRollingWriter() {
         return new RollingFileWriter<>(
-                () ->
-                        new ManifestEntryWriter(
-                                writerFactory,
-                                pathFactory.newPath(),
-                                CoreOptions.FILE_COMPRESSION.defaultValue()),
+                () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newPath(), compression),
                 suggestedFileSize);
     }
 
@@ -157,6 +153,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private final SchemaManager schemaManager;
         private final RowType partitionType;
         private final FileFormat fileFormat;
+        private final String compression;
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
         @Nullable private final SegmentsCache<String> cache;
@@ -166,6 +163,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                 SchemaManager schemaManager,
                 RowType partitionType,
                 FileFormat fileFormat,
+                String compression,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize,
                 @Nullable SegmentsCache<String> cache) {
@@ -173,6 +171,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             this.schemaManager = schemaManager;
             this.partitionType = partitionType;
             this.fileFormat = fileFormat;
+            this.compression = compression;
             this.pathFactory = pathFactory;
             this.suggestedFileSize = suggestedFileSize;
             this.cache = cache;
@@ -187,6 +186,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     new ManifestEntrySerializer(),
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
+                    compression,
                     pathFactory.manifestFileFactory(),
                     suggestedFileSize,
                     cache);
@@ -199,6 +199,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     new SimpleFileEntrySerializer(),
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
+                    compression,
                     pathFactory.manifestFileFactory(),
                     cache);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 84781cdea..e33d4a643 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -44,9 +44,10 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
             ManifestFileMetaSerializer serializer,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
+            String compression,
             PathFactory pathFactory,
             @Nullable SegmentsCache<String> cache) {
-        super(fileIO, serializer, readerFactory, writerFactory, pathFactory, 
cache);
+        super(fileIO, serializer, readerFactory, writerFactory, compression, 
pathFactory, cache);
     }
 
     /**
@@ -63,16 +64,19 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
 
         private final FileIO fileIO;
         private final FileFormat fileFormat;
+        private final String compression;
         private final FileStorePathFactory pathFactory;
         @Nullable private final SegmentsCache<String> cache;
 
         public Factory(
                 FileIO fileIO,
                 FileFormat fileFormat,
+                String compression,
                 FileStorePathFactory pathFactory,
                 @Nullable SegmentsCache<String> cache) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
+            this.compression = compression;
             this.pathFactory = pathFactory;
             this.cache = cache;
         }
@@ -84,6 +88,7 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
                     new ManifestFileMetaSerializer(),
                     fileFormat.createReaderFactory(metaType),
                     fileFormat.createWriterFactory(metaType),
+                    compression,
                     pathFactory.manifestListFactory(),
                     cache);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index e07ff602c..15210792b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.predicate.Predicate;
@@ -206,9 +205,13 @@ public class ManifestsTable implements ReadonlyTable {
             return Collections.emptyList();
         }
         FileStorePathFactory fileStorePathFactory = 
dataTable.store().pathFactory();
-        FileFormat fileFormat = coreOptions.manifestFormat();
         ManifestList manifestList =
-                new ManifestList.Factory(dataTable.fileIO(), fileFormat, 
fileStorePathFactory, null)
+                new ManifestList.Factory(
+                                dataTable.fileIO(),
+                                coreOptions.manifestFormat(),
+                                coreOptions.manifestCompression(),
+                                fileStorePathFactory,
+                                null)
                         .create();
         return snapshot.allManifests(manifestList);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 474b757b6..c05e5d80d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
@@ -46,6 +45,7 @@ public class ObjectsFile<T> {
     protected final ObjectSerializer<T> serializer;
     protected final FormatReaderFactory readerFactory;
     protected final FormatWriterFactory writerFactory;
+    protected final String compression;
     protected final PathFactory pathFactory;
 
     @Nullable private final ObjectsCache<String, T> cache;
@@ -55,12 +55,14 @@ public class ObjectsFile<T> {
             ObjectSerializer<T> serializer,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
+            String compression,
             PathFactory pathFactory,
             @Nullable SegmentsCache<String> cache) {
         this.fileIO = fileIO;
         this.serializer = serializer;
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
+        this.compression = compression;
         this.pathFactory = pathFactory;
         this.cache =
                 cache == null ? null : new ObjectsCache<>(cache, serializer, 
this::createIterator);
@@ -139,8 +141,7 @@ public class ObjectsFile<T> {
         Path path = pathFactory.newPath();
         try {
             try (PositionOutputStream out = fileIO.newOutputStream(path, 
false)) {
-                FormatWriter writer =
-                        writerFactory.create(out, 
CoreOptions.FILE_COMPRESSION.defaultValue());
+                FormatWriter writer = writerFactory.create(out, compression);
                 try {
                     while (records.hasNext()) {
                         writer.addElement(serializer.toRow(records.next()));
diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java 
b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
index 08ba9010f..ec886feb8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
@@ -111,8 +111,8 @@ public class FileFormatTest {
         assertThat(fileFormat instanceof OrcFileFormat).isTrue();
 
         OrcFileFormat orcFileFormat = (OrcFileFormat) fileFormat;
-        
assertThat(orcFileFormat.formatContext().formatOptions().get("hello")).isEqualTo("world");
-        
assertThat(orcFileFormat.formatContext().readBatchSize()).isEqualTo(1024);
+        
assertThat(orcFileFormat.orcProperties().get("orc.hello")).isEqualTo("world");
+        assertThat(orcFileFormat.readBatchSize()).isEqualTo(1024);
     }
 
     @ParameterizedTest
@@ -127,8 +127,8 @@ public class FileFormatTest {
         FileFormat fileFormat = fileFormatDiscover.discover(identifier);
         assertThat(fileFormat instanceof OrcFileFormat).isTrue();
         OrcFileFormat orcFileFormat = (OrcFileFormat) fileFormat;
-        
assertThat(orcFileFormat.formatContext().formatOptions().get("hello")).isEqualTo("world");
-        
assertThat(orcFileFormat.formatContext().readBatchSize()).isEqualTo(1024);
+        
assertThat(orcFileFormat.orcProperties().get("orc.hello")).isEqualTo("world");
+        assertThat(orcFileFormat.readBatchSize()).isEqualTo(1024);
     }
 
     public FileFormat createFileFormat(String codec) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index 4ca59a75c..58b259823 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -46,6 +46,7 @@ public class IndexManifestFileHandlerTest {
                 new IndexManifestFile.Factory(
                                 fileStore.fileIO(),
                                 fileStore.options().manifestFormat(),
+                                "zstd",
                                 fileStore.pathFactory())
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
@@ -79,6 +80,7 @@ public class IndexManifestFileHandlerTest {
                 new IndexManifestFile.Factory(
                                 fileStore.fileIO(),
                                 fileStore.options().manifestFormat(),
+                                "zstd",
                                 fileStore.pathFactory())
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index e74af8b30..f4b3c69ba 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -128,6 +128,7 @@ public abstract class ManifestFileMetaTestBase {
                         new SchemaManager(fileIO, path),
                         getPartitionType(),
                         avro,
+                        "zstd",
                         new FileStorePathFactory(
                                 path,
                                 getPartitionType(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index c0566981c..5ac1eb5f2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -108,6 +108,7 @@ public class ManifestFileTest {
                         new SchemaManager(fileIO, path),
                         DEFAULT_PART_TYPE,
                         avro,
+                        "zstd",
                         pathFactory,
                         suggestedFileSize,
                         null)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index 9de039a56..06e106c75 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -105,6 +105,7 @@ public class ManifestListTest {
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         "default",
                         CoreOptions.FILE_FORMAT.defaultValue().toString());
-        return new ManifestList.Factory(FileIOFinder.find(path), avro, 
pathFactory, null).create();
+        return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", 
pathFactory, null)
+                .create();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
index 8b084ddb4..970732e83 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java
@@ -82,6 +82,7 @@ public class ManifestsTableTest extends TableTestBase {
                         FileFormat.fromIdentifier(
                                 
CoreOptions.MANIFEST_FORMAT.defaultValue().toString(),
                                 new Options()),
+                        "zstd",
                         createNonPartFactory(tablePath),
                         null);
         manifestList = factory.create();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 2017ae4ba..da62348d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.api.Test;
 import javax.annotation.Nonnull;
 
 import java.util.List;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
@@ -761,56 +760,52 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
         // assert empty
         assertThat(sql("SELECT * FROM %s$partitions", table)).isEmpty();
 
-        // Convert to another Row to avoid timestamp diff
-        Function<List<Row>, List<Row>> convert =
-                rows ->
-                        rows.stream()
-                                .map(
-                                        r ->
-                                                Row.of(
-                                                        r.getField(0),
-                                                        r.getField(1),
-                                                        r.getField(2),
-                                                        r.getField(3)))
-                                .collect(Collectors.toList());
-
         // assert new partitions
         sql("INSERT INTO %s VALUES (3, 1, 4, 'S2'), (1, 2, 2, 'S1'), (1, 2, 2, 
'S1')", table);
         sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
-        List<Row> result = sql("SELECT * FROM %s$partitions", table);
-        assertThat(convert.apply(result))
-                .containsExactlyInAnyOrder(
-                        Row.of("[1]", 2L, 910L, 2L), Row.of("[2]", 3L, 879L, 
2L));
+        List<Row> result =
+                sql("SELECT `partition`, record_count, file_count FROM 
%s$partitions", table);
+        assertThat(result).containsExactlyInAnyOrder(Row.of("[1]", 2L, 2L), 
Row.of("[2]", 3L, 2L));
 
         // assert new files in partition
         sql("INSERT INTO %s VALUES (3, 4, 4, 'S3'), (1, 3, 2, 'S4')", table);
         sql("INSERT INTO %s VALUES (3, 1, 4, 'S3'), (1, 2, 2, 'S4')", table);
-        result = sql(String.format("SELECT * FROM %s$partitions", table));
-        assertThat(convert.apply(result))
+        result =
+                sql(
+                        String.format(
+                                "SELECT `partition`, record_count, file_count 
FROM %s$partitions",
+                                table));
+        assertThat(result)
                 .containsExactlyInAnyOrder(
-                        Row.of("[1]", 3L, 1365L, 3L),
-                        Row.of("[2]", 4L, 1317L, 3L),
-                        Row.of("[3]", 1L, 453L, 1L),
-                        Row.of("[4]", 1L, 440L, 1L));
+                        Row.of("[1]", 3L, 3L),
+                        Row.of("[2]", 4L, 3L),
+                        Row.of("[3]", 1L, 1L),
+                        Row.of("[4]", 1L, 1L));
 
         // assert delete partitions
         sql("ALTER TABLE %s DROP PARTITION (p = 2)", table);
-        result = sql(String.format("SELECT * FROM %s$partitions", table));
-        assertThat(convert.apply(result))
+        result =
+                sql(
+                        String.format(
+                                "SELECT `partition`, record_count, file_count 
FROM %s$partitions",
+                                table));
+        assertThat(result)
                 .containsExactlyInAnyOrder(
-                        Row.of("[1]", 3L, 1365L, 3L),
-                        Row.of("[3]", 1L, 453L, 1L),
-                        Row.of("[4]", 1L, 440L, 1L));
+                        Row.of("[1]", 3L, 3L), Row.of("[3]", 1L, 1L), 
Row.of("[4]", 1L, 1L));
 
         // add new file to p 2
         sql("INSERT INTO %s VALUES (1, 2, 2, 'S1')", table);
-        result = sql(String.format("SELECT * FROM %s$partitions", table));
-        assertThat(convert.apply(result))
+        result =
+                sql(
+                        String.format(
+                                "SELECT `partition`, record_count, file_count 
FROM %s$partitions",
+                                table));
+        assertThat(result)
                 .containsExactlyInAnyOrder(
-                        Row.of("[1]", 3L, 1365L, 3L),
-                        Row.of("[2]", 1L, 438L, 1L),
-                        Row.of("[3]", 1L, 453L, 1L),
-                        Row.of("[4]", 1L, 440L, 1L));
+                        Row.of("[1]", 3L, 3L),
+                        Row.of("[2]", 1L, 1L),
+                        Row.of("[3]", 1L, 1L),
+                        Row.of("[4]", 1L, 1L));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 58a7d0dce..3f1281abc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -565,7 +565,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         table, commit, Committer.createContext("", 
metricGroup, true, false, null));
         committer.commit(Collections.singletonList(manifestCommittable));
         CommitterMetrics metrics = committer.getCommitterMetrics();
-        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(285);
+        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(293);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
index 24cad16ad..3b72f1caa 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/OrcOptions.java
@@ -31,15 +31,6 @@ public class OrcOptions {
                     .defaultValue(1024)
                     .withDescription("write batch size for orc.");
 
-    public static final ConfigOption<String> ORC_COMPRESS =
-            key("orc.compress")
-                    .stringType()
-                    .defaultValue("lz4")
-                    .withDescription(
-                            "Define the compression codec for ORC file, if a 
higher compression ratio is required, "
-                                    + "it is recommended to configure it as 
'zstd', and you can configure: "
-                                    + "orc.compression.zstd.level");
-
     public static final ConfigOption<Integer> ORC_COLUMN_ENCODING_DIRECT =
             key("orc.column.encoding.direct")
                     .intType()
@@ -56,14 +47,4 @@ public class OrcOptions {
                                     + "fraction of the total number of 
non-null rows, turn off "
                                     + "dictionary encoding in orc. Use 0 to 
always disable dictionary encoding. "
                                     + "Use 1 to always use dictionary 
encoding.");
-
-    // Do not use OrcConf.COMPRESSION_ZSTD_LEVEL, it may cause IDE testing to 
occur
-    // NoSuchMethodException
-    public static final ConfigOption<Integer> ORC_COMPRESSION_ZSTD_LEVEL =
-            key("orc.compression.zstd.level")
-                    .intType()
-                    .defaultValue(3)
-                    .withDescription(
-                            "Define the compression level to use with 
ZStandard codec while writing data. "
-                                    + "The valid range is 1~22.");
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBuilder.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBuilder.java
index 6bc40cc37..8c39dc64a 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBuilder.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBuilder.java
@@ -29,5 +29,6 @@ import java.io.Serializable;
 public interface AvroBuilder<T> extends Serializable {
 
     /** Creates and configures an Avro writer to the given output file. */
-    DataFileWriter<T> createWriter(OutputStream outputStream) throws 
IOException;
+    DataFileWriter<T> createWriter(OutputStream outputStream, String 
compression)
+            throws IOException;
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 9b38da96f..29e6eadcf 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.avro;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
@@ -56,11 +57,11 @@ public class AvroFileFormat extends FileFormat {
                     .defaultValue(SNAPPY_CODEC)
                     .withDescription("The compression codec for avro");
 
-    private final Options formatOptions;
+    private final FormatContext context;
 
-    public AvroFileFormat(Options formatOptions) {
+    public AvroFileFormat(FormatContext context) {
         super(IDENTIFIER);
-        this.formatOptions = formatOptions;
+        this.context = context;
     }
 
     @Override
@@ -71,7 +72,7 @@ public class AvroFileFormat extends FileFormat {
 
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
-        return new RowAvroWriterFactory(type, 
formatOptions.get(AVRO_OUTPUT_CODEC));
+        return new RowAvroWriterFactory(type);
     }
 
     @Override
@@ -88,23 +89,32 @@ public class AvroFileFormat extends FileFormat {
         }
     }
 
+    private CodecFactory createCodecFactory(String compression) {
+        Options options = context.formatOptions();
+        if (options.contains(AVRO_OUTPUT_CODEC)) {
+            return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
+        }
+
+        if (compression.equalsIgnoreCase("zstd")) {
+            return CodecFactory.zstandardCodec(context.zstdLevel());
+        }
+        return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC));
+    }
+
     /** A {@link FormatWriterFactory} to write {@link InternalRow}. */
-    private static class RowAvroWriterFactory implements FormatWriterFactory {
+    private class RowAvroWriterFactory implements FormatWriterFactory {
 
         private final AvroWriterFactory<InternalRow> factory;
 
-        private RowAvroWriterFactory(RowType rowType, String codec) {
+        private RowAvroWriterFactory(RowType rowType) {
             this.factory =
                     new AvroWriterFactory<>(
-                            out -> {
+                            (out, compression) -> {
                                 Schema schema = 
AvroSchemaConverter.convertToSchema(rowType);
                                 AvroRowDatumWriter datumWriter = new 
AvroRowDatumWriter(rowType);
                                 DataFileWriter<InternalRow> dataFileWriter =
                                         new DataFileWriter<>(datumWriter);
-
-                                if (codec != null) {
-                                    
dataFileWriter.setCodec(CodecFactory.fromString(codec));
-                                }
+                                
dataFileWriter.setCodec(createCodecFactory(compression));
                                 dataFileWriter.create(schema, out);
                                 return dataFileWriter;
                             });
@@ -113,7 +123,7 @@ public class AvroFileFormat extends FileFormat {
         @Override
         public FormatWriter create(PositionOutputStream out, String 
compression)
                 throws IOException {
-            AvroBulkWriter<InternalRow> writer = factory.create(out);
+            AvroBulkWriter<InternalRow> writer = factory.create(out, 
compression);
             return new FormatWriter() {
 
                 @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormatFactory.java
index 3202edce7..440b3eb72 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormatFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormatFactory.java
@@ -31,6 +31,6 @@ public class AvroFileFormatFactory implements 
FileFormatFactory {
 
     @Override
     public FileFormat create(FormatContext formatContext) {
-        return new AvroFileFormat(formatContext.formatOptions());
+        return new AvroFileFormat(formatContext);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
index 5dd08e413..d9d6e7c01 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroWriterFactory.java
@@ -39,7 +39,9 @@ public class AvroWriterFactory<T> {
         this.avroBuilder = avroBuilder;
     }
 
-    public AvroBulkWriter<T> create(PositionOutputStream out) throws 
IOException {
-        return new AvroBulkWriter<>(avroBuilder.createWriter(new 
CloseShieldOutputStream(out)));
+    public AvroBulkWriter<T> create(PositionOutputStream out, String 
compression)
+            throws IOException {
+        return new AvroBulkWriter<>(
+                avroBuilder.createWriter(new CloseShieldOutputStream(out), 
compression));
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 7cd70390c..bfc6a447d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -31,6 +31,7 @@ import 
org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor;
 import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil;
 import org.apache.paimon.format.orc.writer.RowDataVectorizer;
 import org.apache.paimon.format.orc.writer.Vectorizer;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -43,6 +44,7 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 
+import org.apache.orc.OrcConf;
 import org.apache.orc.TypeDescription;
 
 import javax.annotation.Nullable;
@@ -65,17 +67,16 @@ public class OrcFileFormat extends FileFormat {
     private final Properties orcProperties;
     private final org.apache.hadoop.conf.Configuration readerConf;
     private final org.apache.hadoop.conf.Configuration writerConf;
-
-    private final FormatContext formatContext;
+    private final int readBatchSize;
 
     public OrcFileFormat(FormatContext formatContext) {
         super(IDENTIFIER);
-        this.orcProperties = getOrcProperties(formatContext.formatOptions());
+        this.orcProperties = getOrcProperties(formatContext.formatOptions(), 
formatContext);
         this.readerConf = new org.apache.hadoop.conf.Configuration();
         this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), 
v.toString()));
         this.writerConf = new org.apache.hadoop.conf.Configuration();
         this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), 
v.toString()));
-        this.formatContext = formatContext;
+        this.readBatchSize = formatContext.readBatchSize();
     }
 
     @VisibleForTesting
@@ -84,8 +85,8 @@ public class OrcFileFormat extends FileFormat {
     }
 
     @VisibleForTesting
-    public FormatContext formatContext() {
-        return formatContext;
+    public int readBatchSize() {
+        return readBatchSize;
     }
 
     @Override
@@ -111,7 +112,7 @@ public class OrcFileFormat extends FileFormat {
                 readerConf,
                 (RowType) refineDataType(projectedRowType),
                 orcPredicates,
-                formatContext.readBatchSize());
+                readBatchSize);
     }
 
     @Override
@@ -142,11 +143,25 @@ public class OrcFileFormat extends FileFormat {
         return new OrcWriterFactory(vectorizer, orcProperties, writerConf);
     }
 
-    private static Properties getOrcProperties(Options options) {
+    private static Properties getOrcProperties(Options options, FormatContext 
formatContext) {
         Properties orcProperties = new Properties();
+
         Properties properties = new Properties();
         options.addAllToProperties(properties);
         properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, 
v));
+
+        if 
(!orcProperties.containsKey(OrcConf.COMPRESSION_ZSTD_LEVEL.getAttribute())) {
+            orcProperties.setProperty(
+                    OrcConf.COMPRESSION_ZSTD_LEVEL.getAttribute(),
+                    String.valueOf(formatContext.zstdLevel()));
+        }
+
+        MemorySize blockSize = formatContext.blockSize();
+        if (blockSize != null) {
+            orcProperties.setProperty(
+                    OrcConf.STRIPE_SIZE.getAttribute(), 
String.valueOf(blockSize.getBytes()));
+        }
+
         return orcProperties;
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
index d9927d66f..0a454aa24 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormatFactory.java
@@ -19,9 +19,6 @@
 package org.apache.paimon.format.orc;
 
 import org.apache.paimon.format.FileFormatFactory;
-import org.apache.paimon.options.Options;
-
-import java.util.Properties;
 
 /** Factory to create {@link OrcFileFormat}. */
 public class OrcFileFormatFactory implements FileFormatFactory {
@@ -35,21 +32,6 @@ public class OrcFileFormatFactory implements 
FileFormatFactory {
 
     @Override
     public OrcFileFormat create(FormatContext formatContext) {
-        return new OrcFileFormat(
-                new FormatContext(
-                        supplyDefaultOptions(formatContext.formatOptions()),
-                        formatContext.readBatchSize()));
-    }
-
-    private Options supplyDefaultOptions(Options options) {
-        if (!options.containsKey("compress")) {
-            Properties properties = new Properties();
-            options.addAllToProperties(properties);
-            properties.setProperty("compress", "lz4");
-            Options newOptions = new Options();
-            properties.forEach((k, v) -> newOptions.setString(k.toString(), 
v.toString()));
-            return newOptions;
-        }
-        return options;
+        return new OrcFileFormat(formatContext);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 9a703bb18..76a01f01b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.impl.PhysicalFsWriter;
@@ -104,11 +105,11 @@ public class OrcWriterFactory implements 
FormatWriterFactory {
 
     @Override
     public FormatWriter create(PositionOutputStream out, String compression) 
throws IOException {
-        if (null != compression) {
-            writerProperties.setProperty(OrcConf.COMPRESS.getAttribute(), 
compression);
+        OrcFile.WriterOptions opts = getWriterOptions();
+        if (!writerProperties.containsKey(OrcConf.COMPRESS.getAttribute())) {
+            opts.compress(CompressionKind.valueOf(compression.toUpperCase()));
         }
 
-        OrcFile.WriterOptions opts = getWriterOptions();
         opts.physicalWriter(
                 new PhysicalFsWriter(
                         new FSDataOutputStream(out, null) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 87987fac8..ffeb97909 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -25,11 +25,14 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
 
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -54,7 +57,7 @@ public class ParquetFileFormat extends FileFormat {
     public FormatReaderFactory createReaderFactory(
             RowType projectedRowType, List<Predicate> filters) {
         return new ParquetReaderFactory(
-                getParquetConfiguration(formatContext.formatOptions()),
+                getParquetConfiguration(formatContext),
                 projectedRowType,
                 formatContext.readBatchSize());
     }
@@ -62,8 +65,7 @@ public class ParquetFileFormat extends FileFormat {
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
         return new ParquetWriterFactory(
-                new RowDataParquetBuilder(
-                        type, 
getParquetConfiguration(formatContext.formatOptions())));
+                new RowDataParquetBuilder(type, 
getParquetConfiguration(formatContext)));
     }
 
     @Override
@@ -77,9 +79,23 @@ public class ParquetFileFormat extends FileFormat {
         return Optional.of(new ParquetSimpleStatsExtractor(type, 
statsCollectors));
     }
 
-    public static Options getParquetConfiguration(Options options) {
-        Options conf = new Options();
-        options.toMap().forEach((key, value) -> conf.setString(IDENTIFIER + 
"." + key, value));
-        return conf;
+    public static Options getParquetConfiguration(FormatContext context) {
+        Options parquetOptions = new Options();
+        context.formatOptions()
+                .toMap()
+                .forEach((key, value) -> parquetOptions.setString(IDENTIFIER + 
"." + key, value));
+
+        if 
(!parquetOptions.containsKey("parquet.compression.codec.zstd.level")) {
+            parquetOptions.set(
+                    "parquet.compression.codec.zstd.level", 
String.valueOf(context.zstdLevel()));
+        }
+
+        MemorySize blockSize = context.blockSize();
+        if (blockSize != null) {
+            parquetOptions.set(
+                    ParquetOutputFormat.BLOCK_SIZE, 
String.valueOf(blockSize.getBytes()));
+        }
+
+        return parquetOptions;
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
index da1cfab74..0cecccc0d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormatFactory.java
@@ -19,15 +19,10 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.format.FileFormatFactory;
-import org.apache.paimon.options.Options;
-
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
-import java.util.Properties;
 
 /** Factory to create {@link ParquetFileFormat}. */
 public class ParquetFileFormatFactory implements FileFormatFactory {
+
     public static final String IDENTIFIER = "parquet";
 
     @Override
@@ -37,23 +32,6 @@ public class ParquetFileFormatFactory implements 
FileFormatFactory {
 
     @Override
     public ParquetFileFormat create(FormatContext formatContext) {
-        return new ParquetFileFormat(
-                new FormatContext(
-                        supplyDefaultOptions(formatContext.formatOptions()),
-                        formatContext.readBatchSize()));
-    }
-
-    private Options supplyDefaultOptions(Options options) {
-        String compression =
-                
ParquetOutputFormat.COMPRESSION.replaceFirst(String.format("^%s.", IDENTIFIER), 
"");
-        if (!options.containsKey(compression)) {
-            Properties properties = new Properties();
-            options.addAllToProperties(properties);
-            properties.setProperty(compression, 
CompressionCodecName.SNAPPY.name());
-            Options newOptions = new Options();
-            properties.forEach((k, v) -> newOptions.setString(k.toString(), 
v.toString()));
-            return newOptions;
-        }
-        return options;
+        return new ParquetFileFormat(formatContext);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
index a55766194..26d38a1c1 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java
@@ -29,8 +29,6 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.OutputFile;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /** A {@link ParquetBuilder} for {@link InternalRow}. */
@@ -77,14 +75,7 @@ public class RowDataParquetBuilder implements 
ParquetBuilder<InternalRow> {
                 .build();
     }
 
-    public String getCompression(@Nullable String compression) {
-        String compressName;
-        if (null != compression) {
-            compressName = compression;
-        } else {
-            compressName =
-                    conf.get(ParquetOutputFormat.COMPRESSION, 
CompressionCodecName.SNAPPY.name());
-        }
-        return compressName;
+    public String getCompression(String compression) {
+        return conf.get("parquet.compression", compression);
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
index e9caf3b24..44edcdafc 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.avro;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.fs.Path;
@@ -51,7 +52,7 @@ public class AvroFileFormatTest {
 
     @BeforeAll
     public static void before() {
-        fileFormat = new AvroFileFormat(new Options());
+        fileFormat = new AvroFileFormat(new FormatContext(new Options(), 
1024));
     }
 
     @Test
@@ -105,13 +106,13 @@ public class AvroFileFormatTest {
     @Test
     void testReadRowPosition() throws IOException {
         RowType rowType = DataTypes.ROW(DataTypes.INT().notNull());
-        FileFormat format = new AvroFileFormat(new Options());
+        FileFormat format = new AvroFileFormat(new FormatContext(new 
Options(), 1024));
 
         LocalFileIO fileIO = LocalFileIO.create();
         Path file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
 
         try (PositionOutputStream out = fileIO.newOutputStream(file, false)) {
-            FormatWriter writer = 
format.createWriterFactory(rowType).create(out, null);
+            FormatWriter writer = 
format.createWriterFactory(rowType).create(out, "zstd");
             for (int i = 0; i < 1000000; i++) {
                 writer.addElement(GenericRow.of(i));
             }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
index 294a29395..b8da2be67 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.avro;
 
 import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
 import org.apache.paimon.format.FormatReadWriteTest;
 import org.apache.paimon.options.Options;
 
@@ -31,6 +32,6 @@ public class AvroFormatReadWriteTest extends 
FormatReadWriteTest {
 
     @Override
     protected FileFormat fileFormat() {
-        return new AvroFileFormat(new Options());
+        return new AvroFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024));
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
index fa80ec11d..eeee076da 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java
@@ -41,7 +41,6 @@ public class OrcFileFormatTest {
         options.setString("haha", "1");
         OrcFileFormat orc = new OrcFileFormatFactory().create(new 
FormatContext(options, 1024));
         assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".haha", 
"")).isEqualTo("1");
-        assertThat(orc.orcProperties().getProperty(IDENTIFIER + ".compress", 
"")).isEqualTo("lz4");
     }
 
     @Test
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
index eccac13ea..13e75cc77 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
@@ -57,7 +57,7 @@ class OrcBulkWriterTest {
 
         Path path = new Path(tempDir.toUri().toString(), "1.orc");
         PositionOutputStream out = LocalFileIO.create().newOutputStream(path, 
false);
-        FormatWriter formatWriter = writerFactory.create(out, null);
+        FormatWriter formatWriter = writerFactory.create(out, "zstd");
 
         Assertions.assertThat(formatWriter).isInstanceOf(OrcBulkWriter.class);
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index 7b687ec27..e7f0b19d4 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -60,16 +60,6 @@ public class ParquetFileFormatTest {
         assertThat(parquet.formatOptions().getString(KEY1)).isEqualTo("v1");
     }
 
-    @Test
-    public void testDefaultCompressionCodecName() {
-        // no parquet.compression and no file.compression
-        Options conf = new Options();
-        RowDataParquetBuilder builder =
-                new RowDataParquetBuilder(
-                        new RowType(new ArrayList<>()), 
getParquetConfiguration(conf));
-        
assertThat(builder.getCompression(null)).isEqualTo(CompressionCodec.SNAPPY.name());
-    }
-
     @Test
     public void testFileCompressionHigherPreference() {
         Options conf = new Options();
@@ -78,9 +68,10 @@ public class ParquetFileFormatTest {
         RowDataParquetBuilder builder =
                 new RowDataParquetBuilder(
                         new RowType(new ArrayList<>()),
-                        getParquetConfiguration(conf.removePrefix(IDENTIFIER + 
".")));
+                        getParquetConfiguration(
+                                new FormatContext(conf.removePrefix(IDENTIFIER 
+ "."), 1024)));
         assertThat(builder.getCompression(null)).isEqualTo(lz4);
-        
assertThat(builder.getCompression("SNAPPY")).isEqualTo(CompressionCodec.SNAPPY.name());
+        assertThat(builder.getCompression("SNAPPY")).isEqualTo(lz4);
     }
 
     @Test

Reply via email to