This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b8e2e9d64 [core] Introduce spill-compression.zstd-level to control 
zstd level (#4103)
b8e2e9d64 is described below

commit b8e2e9d64027c0c7f9881c175b9dedd31d9d672a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 30 17:46:36 2024 +0800

    [core] Introduce spill-compression.zstd-level to control zstd level (#4103)
---
 .../shortcodes/generated/core_configuration.html   | 18 +++--
 .../lookup/LookupBloomFilterBenchmark.java         |  6 +-
 .../main/java/org/apache/paimon/CoreOptions.java   | 19 +++++-
 .../compression/BlockCompressionFactory.java       |  8 +--
 .../apache/paimon/compression/CompressOptions.java | 76 ++++++++++++++++++++++
 .../compression/ZstdBlockCompressionFactory.java   |  8 ++-
 .../paimon/compression/ZstdBlockCompressor.java    |  8 ++-
 .../apache/paimon/lookup/LookupStoreFactory.java   |  4 +-
 .../paimon/lookup/hash/HashLookupStoreFactory.java |  6 +-
 .../paimon/lookup/sort/SortLookupStoreFactory.java |  3 +-
 .../paimon/compression/BlockCompressionTest.java   |  3 +-
 .../lookup/hash/HashLookupStoreFactoryTest.java    |  5 +-
 .../lookup/sort/SortLookupStoreFactoryTest.java    |  5 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java | 10 +--
 .../paimon/crosspartition/GlobalIndexAssigner.java |  4 +-
 .../org/apache/paimon/disk/ExternalBuffer.java     |  3 +-
 .../java/org/apache/paimon/disk/RowBuffer.java     |  3 +-
 .../org/apache/paimon/lookup/RocksDBState.java     |  2 +-
 .../org/apache/paimon/mergetree/MergeSorter.java   |  5 +-
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  5 +-
 .../paimon/mergetree/SortBufferWriteBuffer.java    |  3 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  5 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  2 +-
 .../paimon/sort/BinaryExternalSortBuffer.java      |  7 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  3 +-
 .../org/apache/paimon/disk/ExternalBufferTest.java |  6 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |  3 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |  6 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  6 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  3 +-
 .../mergetree/SortBufferWriteBufferTestBase.java   |  3 +-
 .../paimon/sort/BinaryExternalSortBufferTest.java  |  3 +-
 .../paimon/flink/sink/LocalMergeOperator.java      |  2 +-
 .../apache/paimon/flink/sorter/SortOperator.java   |  5 +-
 .../org/apache/paimon/flink/sorter/SortUtils.java  |  2 +-
 .../paimon/flink/sorter/SortOperatorTest.java      |  5 +-
 36 files changed, 205 insertions(+), 60 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 66bc354c9..508368559 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -647,12 +647,6 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Long</td>
             <td>Optional snapshot id used in case of "from-snapshot" or 
"from-snapshot-full" scan mode</td>
         </tr>
-        <tr>
-            <td><h5>streaming.read.snapshot.delay</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Duration</td>
-            <td>The delay duration of stream read when scan incremental 
snapshots.</td>
-        </tr>
         <tr>
             <td><h5>scan.tag-name</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -780,6 +774,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>String</td>
             <td>Compression for spill, currently zstd, lzo and zstd are 
supported.</td>
         </tr>
+        <tr>
+            <td><h5>spill-compression.zstd-level</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Default spill compression zstd level. For higher compression 
rates, it can be configured to 9, but the read and write speed will 
significantly decrease.</td>
+        </tr>
         <tr>
             <td><h5>streaming-read-mode</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -792,6 +792,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Boolean</td>
             <td>Whether to read the changes from overwrite in streaming mode. 
Cannot be set to true when changelog producer is full-compaction or lookup 
because it will read duplicated changes.</td>
         </tr>
+        <tr>
+            <td><h5>streaming.read.snapshot.delay</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>The delay duration of stream read when scan incremental 
snapshots.</td>
+        </tr>
         <tr>
             <td><h5>tag.automatic-completion</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
index db237f28f..c15765d41 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.benchmark.lookup;
 
 import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.lookup.hash.HashLookupStoreReader;
@@ -102,7 +103,10 @@ public class LookupBloomFilterBenchmark extends 
AbstractLookupBenchmark {
         Arrays.fill(value, (byte) 1);
         HashLookupStoreFactory factory =
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 
1024, 0.75, "none");
+                        new CacheManager(MemorySize.ofMebiBytes(10)),
+                        16 * 1024,
+                        0.75,
+                        new CompressOptions("none", 1));
 
         File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
         HashLookupStoreWriter writer = factory.createWriter(file, filter);
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 21eee2a14..a9da36e7d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation;
 import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
 import org.apache.paimon.annotation.Documentation.Immutable;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
@@ -357,6 +358,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Compression for spill, currently zstd, lzo and 
zstd are supported.");
 
+    public static final ConfigOption<Integer> SPILL_COMPRESSION_ZSTD_LEVEL =
+            key("spill-compression.zstd-level")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Default spill compression zstd level. For higher 
compression rates, it can be configured to 9, but the read and write speed will 
significantly decrease.");
+
     public static final ConfigOption<Boolean> WRITE_ONLY =
             key("write-only")
                     .booleanType()
@@ -1638,8 +1646,15 @@ public class CoreOptions implements Serializable {
         return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
     }
 
-    public String spillCompression() {
-        return options.get(SPILL_COMPRESSION);
+    public CompressOptions spillCompressOptions() {
+        return new CompressOptions(
+                options.get(SPILL_COMPRESSION), 
options.get(SPILL_COMPRESSION_ZSTD_LEVEL));
+    }
+
+    public CompressOptions lookupCompressOptions() {
+        return new CompressOptions(
+                options.get(LOOKUP_CACHE_SPILL_COMPRESSION),
+                options.get(SPILL_COMPRESSION_ZSTD_LEVEL));
     }
 
     public Duration continuousDiscoveryInterval() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 3bf81a6c9..60dd1657b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -37,12 +37,12 @@ public interface BlockCompressionFactory {
 
     /** Creates {@link BlockCompressionFactory} according to the 
configuration. */
     @Nullable
-    static BlockCompressionFactory create(String compression) {
-        switch (compression.toUpperCase()) {
+    static BlockCompressionFactory create(CompressOptions compression) {
+        switch (compression.compress().toUpperCase()) {
             case "NONE":
                 return null;
             case "ZSTD":
-                return new ZstdBlockCompressionFactory();
+                return new 
ZstdBlockCompressionFactory(compression.zstdLevel());
             case "LZ4":
                 return new Lz4BlockCompressionFactory();
             case "LZO":
@@ -60,7 +60,7 @@ public interface BlockCompressionFactory {
             case NONE:
                 return null;
             case ZSTD:
-                return new ZstdBlockCompressionFactory();
+                return new ZstdBlockCompressionFactory(1);
             case LZ4:
                 return new Lz4BlockCompressionFactory();
             case LZO:
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
new file mode 100644
index 000000000..0db39a7ae
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.compression;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Options of compression. */
+public class CompressOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String compress;
+    private final int zstdLevel;
+
+    public CompressOptions(String compress, int zstdLevel) {
+        this.compress = compress;
+        this.zstdLevel = zstdLevel;
+    }
+
+    public String compress() {
+        return compress;
+    }
+
+    public int zstdLevel() {
+        return zstdLevel;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CompressOptions that = (CompressOptions) o;
+        return zstdLevel == that.zstdLevel && Objects.equals(compress, 
that.compress);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(compress, zstdLevel);
+    }
+
+    @Override
+    public String toString() {
+        return "CompressOptions{"
+                + "compress='"
+                + compress
+                + '\''
+                + ", zstdLevel="
+                + zstdLevel
+                + '}';
+    }
+
+    public static CompressOptions defaultOptions() {
+        return new CompressOptions("zstd", 1);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
index 13745e6ee..584f5ade7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
@@ -21,6 +21,12 @@ package org.apache.paimon.compression;
 /** Implementation of {@link BlockCompressionFactory} for zstd codec. */
 public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
 
+    private final int compressLevel;
+
+    public ZstdBlockCompressionFactory(int compressLevel) {
+        this.compressLevel = compressLevel;
+    }
+
     @Override
     public BlockCompressionType getCompressionType() {
         return BlockCompressionType.ZSTD;
@@ -28,7 +34,7 @@ public class ZstdBlockCompressionFactory implements 
BlockCompressionFactory {
 
     @Override
     public BlockCompressor getCompressor() {
-        return new ZstdBlockCompressor();
+        return new ZstdBlockCompressor(compressLevel);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
index bd6d2a5a6..2da979479 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
@@ -32,6 +32,12 @@ public class ZstdBlockCompressor implements BlockCompressor {
 
     private static final int MAX_BLOCK_SIZE = 128 * 1024;
 
+    private final int level;
+
+    public ZstdBlockCompressor(int level) {
+        this.level = level;
+    }
+
     @Override
     public int getMaxCompressedSize(int srcSize) {
         return HEADER_LENGTH + zstdMaxCompressedLength(srcSize);
@@ -51,7 +57,7 @@ public class ZstdBlockCompressor implements BlockCompressor {
             throws BufferCompressionException {
         ByteArrayOutputStream stream = new ByteArrayOutputStream(dst, dstOff);
         try (ZstdOutputStream zstdStream =
-                new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE, 1)) 
{
+                new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE, 
level)) {
             zstdStream.setWorkers(0);
             zstdStream.write(src, srcOff, srcLen);
         } catch (IOException e) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
index 2d57cf192..adee85916 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.lookup;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
@@ -66,8 +67,7 @@ public interface LookupStoreFactory {
 
     static LookupStoreFactory create(
             CoreOptions options, CacheManager cacheManager, 
Comparator<MemorySlice> keyComparator) {
-        String compression =
-                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION);
+        CompressOptions compression = options.lookupCompressOptions();
         switch (options.lookupLocalFileType()) {
             case SORT:
                 return new SortLookupStoreFactory(
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
index 7ece51734..1dbf715a1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.lookup.hash;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.utils.BloomFilter;
@@ -37,7 +38,10 @@ public class HashLookupStoreFactory implements 
LookupStoreFactory {
     @Nullable private final BlockCompressionFactory compressionFactory;
 
     public HashLookupStoreFactory(
-            CacheManager cacheManager, int cachePageSize, double loadFactor, 
String compression) {
+            CacheManager cacheManager,
+            int cachePageSize,
+            double loadFactor,
+            CompressOptions compression) {
         this.cacheManager = cacheManager;
         this.cachePageSize = cachePageSize;
         this.loadFactor = loadFactor;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index 50b784688..62c4131e5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.memory.MemorySlice;
@@ -42,7 +43,7 @@ public class SortLookupStoreFactory implements 
LookupStoreFactory {
             Comparator<MemorySlice> comparator,
             CacheManager cacheManager,
             int blockSize,
-            String compression) {
+            CompressOptions compression) {
         this.comparator = comparator;
         this.cacheManager = cacheManager;
         this.blockSize = blockSize;
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
index 5ece1cb03..1231794fb 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
@@ -36,7 +36,8 @@ class BlockCompressionTest {
     @ParameterizedTest
     @MethodSource("compressCodecGenerator")
     void testBlockCompression(String compress) {
-        BlockCompressionFactory factory = 
BlockCompressionFactory.create(compress);
+        BlockCompressionFactory factory =
+                BlockCompressionFactory.create(new CompressOptions(compress, 
1));
         runTest(factory, 32768);
         runTest(factory, 16);
     }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
index b0dddd840..6444502f2 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.io.DataOutputSerializer;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory.Context;
@@ -64,14 +65,14 @@ public class HashLookupStoreFactoryTest {
     private final int pageSize = 1024;
 
     private final boolean enableBloomFilter;
-    private final String compress;
+    private final CompressOptions compress;
 
     private File file;
     private HashLookupStoreFactory factory;
 
     public HashLookupStoreFactoryTest(List<Object> var) {
         this.enableBloomFilter = (Boolean) var.get(0);
-        this.compress = (String) var.get(1);
+        this.compress = new CompressOptions((String) var.get(1), 1);
     }
 
     @SuppressWarnings("unused")
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
index f8c84b858..c222877c8 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.sort;
 
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.io.cache.CacheManager;
@@ -66,7 +67,7 @@ public class SortLookupStoreFactoryTest {
                         Comparator.naturalOrder(),
                         new CacheManager(MemorySize.ofMebiBytes(1)),
                         1024,
-                        "zstd");
+                        CompressOptions.defaultOptions());
 
         SortLookupStoreWriter writer = factory.createWriter(file, null);
         for (int i = 0; i < VALUE_COUNT; i++) {
@@ -94,7 +95,7 @@ public class SortLookupStoreFactoryTest {
                         keySerializer.createSliceComparator(),
                         new CacheManager(MemorySize.ofMebiBytes(1)),
                         64 * 1024,
-                        "zstd");
+                        CompressOptions.defaultOptions());
         SortLookupStoreWriter writer = factory.createWriter(file, null);
         for (int i = 0; i < VALUE_COUNT; i++) {
             byte[] bytes = toBytes(keySerializer, row, i);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index ee06945a0..402c6c1f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -21,6 +21,7 @@ package org.apache.paimon.append;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
@@ -81,7 +82,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
     @Nullable private CompactDeletionFile compactDeletionFile;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
-    private final String spillCompression;
+    private final CompressOptions spillCompression;
     private SinkWriter sinkWriter;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
     @Nullable private final IOManager ioManager;
@@ -106,7 +107,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, 
MemoryOwner {
             boolean useWriteBuffer,
             boolean spillable,
             String fileCompression,
-            String spillCompression,
+            CompressOptions spillCompression,
             SimpleColStatsCollector.Factory[] statsCollectors,
             MemorySize maxDiskSize,
             FileIndexOptions fileIndexOptions,
@@ -456,11 +457,12 @@ public class AppendOnlyWriter implements 
BatchRecordWriter, MemoryOwner {
 
         private final MemorySize maxDiskSize;
 
-        private final String compression;
+        private final CompressOptions compression;
 
         private RowBuffer writeBuffer;
 
-        private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, 
String compression) {
+        private BufferedSinkWriter(
+                boolean spillable, MemorySize maxDiskSize, CompressOptions 
compression) {
             this.spillable = spillable;
             this.maxDiskSize = maxDiskSize;
             this.compression = compression;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 060e2428a..311b997a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -174,7 +174,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         new InternalRowSerializer(table.rowType()),
                         true,
                         coreOptions.writeBufferSpillDiskSize(),
-                        coreOptions.spillCompression());
+                        coreOptions.spillCompressOptions());
     }
 
     public void bootstrapKey(InternalRow value) throws IOException {
@@ -305,7 +305,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         coreOptions.writeBufferSize() / 2,
                         coreOptions.pageSize(),
                         coreOptions.localSortMaxNumFileHandles(),
-                        coreOptions.spillCompression(),
+                        coreOptions.spillCompressOptions(),
                         coreOptions.writeBufferSpillDiskSize());
 
         Function<SortOrder, RowIterator> iteratorFunction =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 34c082371..638b15280 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.disk;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
@@ -64,7 +65,7 @@ public class ExternalBuffer implements RowBuffer {
             MemorySegmentPool pool,
             AbstractRowDataSerializer<?> serializer,
             MemorySize maxDiskSize,
-            String compression) {
+            CompressOptions compression) {
         this.ioManager = ioManager;
         this.pool = pool;
         this.maxDiskSize = maxDiskSize;
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 3c0a31cd6..7a0e3e2d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.disk;
 
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
@@ -60,7 +61,7 @@ public interface RowBuffer {
             AbstractRowDataSerializer<InternalRow> serializer,
             boolean spillable,
             MemorySize maxDiskSize,
-            String compression) {
+            CompressOptions compression) {
         if (spillable) {
             return new ExternalBuffer(ioManager, memoryPool, serializer, 
maxDiskSize, compression);
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index d3fafeeb6..25e58984e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -112,7 +112,7 @@ public abstract class RocksDBState<K, V, CacheV> {
                 options.writeBufferSize() / 2,
                 options.pageSize(),
                 options.localSortMaxNumFileHandles(),
-                options.spillCompression(),
+                options.spillCompressOptions(),
                 options.writeBufferSpillDiskSize());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index f02cdaa3e..ae6db76b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -63,7 +64,7 @@ public class MergeSorter {
 
     private final SortEngine sortEngine;
     private final int spillThreshold;
-    private final String compression;
+    private final CompressOptions compression;
 
     private final MemorySegmentPool memoryPool;
 
@@ -76,7 +77,7 @@ public class MergeSorter {
             @Nullable IOManager ioManager) {
         this.sortEngine = options.sortEngine();
         this.spillThreshold = options.sortSpillThreshold();
-        this.compression = options.spillCompression();
+        this.compression = options.spillCompressOptions();
         this.keyType = keyType;
         this.valueType = valueType;
         this.memoryPool =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f124f3a40..de0a28c33 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.CompactIncrement;
@@ -59,7 +60,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private final boolean writeBufferSpillable;
     private final MemorySize maxDiskSize;
     private final int sortMaxFan;
-    private final String sortCompression;
+    private final CompressOptions sortCompression;
     private final IOManager ioManager;
 
     private final RowType keyType;
@@ -89,7 +90,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             boolean writeBufferSpillable,
             MemorySize maxDiskSize,
             int sortMaxFan,
-            String sortCompression,
+            CompressOptions sortCompression,
             IOManager ioManager,
             CompactManager compactManager,
             long maxSequenceNumber,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 6406435a0..76c84fd4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.NormalizedKeyComputer;
 import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -69,7 +70,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
             boolean spillable,
             MemorySize maxDiskSize,
             int sortMaxFan,
-            String compression,
+            CompressOptions compression,
             IOManager ioManager) {
         this.keyType = keyType;
         this.valueType = valueType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 5a1165aaf..203a9ff35 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.append.BucketedAppendCompactManager;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
@@ -79,7 +80,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     private final int compactionMaxFileNum;
     private final boolean commitForceCompact;
     private final String fileCompression;
-    private final String spillCompression;
+    private final CompressOptions spillCompression;
     private final boolean useWriteBuffer;
     private final boolean spillable;
     private final MemorySize maxDiskSize;
@@ -123,7 +124,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             this.skipCompaction = options.writeOnly();
         }
         this.fileCompression = options.fileCompression();
-        this.spillCompression = options.spillCompression();
+        this.spillCompression = options.spillCompressOptions();
         this.useWriteBuffer = options.useWriteBufferForAppend();
         this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
         this.maxDiskSize = options.writeBufferSpillDiskSize();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index e0018a1ea..2ba523e85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -210,7 +210,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 bufferSpillable(),
                 options.writeBufferSpillDiskSize(),
                 options.localSortMaxNumFileHandles(),
-                options.spillCompression(),
+                options.spillCompressOptions(),
                 ioManager,
                 compactManager,
                 restoredMaxSeqNumber,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index a6f059044..1ae453546 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.sort;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -68,7 +69,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
             BinaryInMemorySortBuffer inMemorySortBuffer,
             IOManager ioManager,
             int maxNumFileHandles,
-            String compression,
+            CompressOptions compression,
             MemorySize maxDiskSize) {
         this.serializer = serializer;
         this.inMemorySortBuffer = inMemorySortBuffer;
@@ -99,7 +100,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
             long bufferSize,
             int pageSize,
             int maxNumFileHandles,
-            String compression,
+            CompressOptions compression,
             MemorySize maxDiskSize) {
         return create(
                 ioManager,
@@ -117,7 +118,7 @@ public class BinaryExternalSortBuffer implements SortBuffer 
{
             int[] keyFields,
             MemorySegmentPool pool,
             int maxNumFileHandles,
-            String compression,
+            CompressOptions compression,
             MemorySize maxDiskSize) {
         RecordComparator comparator = 
newRecordComparator(rowType.getFieldTypes(), keyFields);
         BinaryInMemorySortBuffer sortBuffer =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 85385280e..436f8f584 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.append;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -625,7 +626,7 @@ public class AppendOnlyWriterTest {
                         useWriteBuffer,
                         spillable,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        CoreOptions.SPILL_COMPRESSION.defaultValue(),
+                        CompressOptions.defaultOptions(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
                         MemorySize.MAX_VALUE,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 3ea27cfa7..130cf32e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.disk;
 
-import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -67,7 +67,7 @@ public class ExternalBufferTest {
                 new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
                 this.serializer,
                 maxDiskSize,
-                CoreOptions.SPILL_COMPRESSION.defaultValue());
+                CompressOptions.defaultOptions());
     }
 
     @Test
@@ -182,7 +182,7 @@ public class ExternalBufferTest {
                         new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
                         new BinaryRowSerializer(1),
                         MemorySize.MAX_VALUE,
-                        CoreOptions.SPILL_COMPRESSION.defaultValue());
+                        CompressOptions.defaultOptions());
         assertThatThrownBy(() -> 
writeHuge(buffer)).isInstanceOf(IOException.class);
         buffer.reset();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 13ac9736e..539ff4757 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.disk.IOManager;
@@ -87,7 +88,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         false,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        CoreOptions.SPILL_COMPRESSION.defaultValue(),
+                        CompressOptions.defaultOptions(),
                         StatsCollectorFactories.createStatsFactories(
                                 options, SCHEMA.getFieldNames()),
                         MemorySize.MAX_VALUE,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 2e6393460..0ab636c33 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -194,7 +195,10 @@ public class ContainsLevelsTest {
                                         0, file.fileName(), file.fileSize(), 
file.level()),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
+                        new CacheManager(MemorySize.ofMebiBytes(1)),
+                        2048,
+                        0.75,
+                        new CompressOptions("none", 1)),
                 rowCount -> BloomFilter.builder(rowCount, 0.01),
                 LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 76094a520..2dce81ce5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -274,7 +275,10 @@ public class LookupLevelsTest {
                                         0, file.fileName(), file.fileSize(), 
file.level()),
                 file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
+                        new CacheManager(MemorySize.ofMebiBytes(1)),
+                        2048,
+                        0.75,
+                        new CompressOptions("none", 1)),
                 rowCount -> BloomFilter.builder(rowCount, 0.05),
                 LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 6a6848ce1..80080516e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -514,7 +515,7 @@ public abstract class MergeTreeTestBase {
                         false,
                         MemorySize.ofKibiBytes(10),
                         128,
-                        "lz4",
+                        CompressOptions.defaultOptions(),
                         null,
                         compactManager,
                         maxSequenceNumber,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index dea0268ec..27606ee33 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
@@ -71,7 +72,7 @@ public abstract class SortBufferWriteBufferTestBase {
                     false,
                     MemorySize.MAX_VALUE,
                     128,
-                    "lz4",
+                    CompressOptions.defaultOptions(),
                     null);
 
     protected abstract boolean addOnly();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
index 4c5cc12bf..59ef802a1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.sort;
 
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -309,7 +310,7 @@ public class BinaryExternalSortBufferTest {
                 inMemorySortBuffer,
                 ioManager,
                 maxNumFileHandles,
-                "lz4",
+                CompressOptions.defaultOptions(),
                 diskSize);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 273f38308..18d440dbc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -125,7 +125,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                         false,
                         MemorySize.MAX_VALUE,
                         options.localSortMaxNumFileHandles(),
-                        options.spillCompression(),
+                        options.spillCompressOptions(),
                         null);
         currentWatermark = Long.MIN_VALUE;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 331d1058d..52ad89689 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sorter;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -44,7 +45,7 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
     private final int pageSize;
     private final int arity;
     private final int spillSortMaxNumFiles;
-    private final String spillCompression;
+    private final CompressOptions spillCompression;
     private final int sinkParallelism;
     private final MemorySize maxDiskSize;
 
@@ -57,7 +58,7 @@ public class SortOperator extends 
TableStreamOperator<InternalRow>
             long maxMemory,
             int pageSize,
             int spillSortMaxNumFiles,
-            String spillCompression,
+            CompressOptions spillCompression,
             int sinkParallelism,
             MemorySize maxDiskSize) {
         this.keyType = keyType;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 79e9f1298..e163ac364 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -161,7 +161,7 @@ public class SortUtils {
                                     options.writeBufferSize(),
                                     options.pageSize(),
                                     options.localSortMaxNumFileHandles(),
-                                    options.spillCompression(),
+                                    options.spillCompressOptions(),
                                     sinkParallelism,
                                     options.writeBufferSpillDiskSize()))
                     .setParallelism(sinkParallelism)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
index 09db30579..155e259e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sorter;
 
+import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -62,7 +63,7 @@ public class SortOperatorTest {
                         MemorySize.parse("10 mb").getBytes(),
                         (int) MemorySize.parse("16 kb").getBytes(),
                         128,
-                        "lz4",
+                        CompressOptions.defaultOptions(),
                         1,
                         MemorySize.MAX_VALUE) {};
 
@@ -111,7 +112,7 @@ public class SortOperatorTest {
                         MemorySize.parse("10 mb").getBytes(),
                         (int) MemorySize.parse("16 kb").getBytes(),
                         128,
-                        "lz4",
+                        CompressOptions.defaultOptions(),
                         1,
                         MemorySize.MAX_VALUE) {};
         OneInputStreamOperatorTestHarness harness = 
createTestHarness(sortOperator);

Reply via email to