This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b8e2e9d64 [core] Introduce spill-compression.zstd-level to control
zstd level (#4103)
b8e2e9d64 is described below
commit b8e2e9d64027c0c7f9881c175b9dedd31d9d672a
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 30 17:46:36 2024 +0800
[core] Introduce spill-compression.zstd-level to control zstd level (#4103)
---
.../shortcodes/generated/core_configuration.html | 18 +++--
.../lookup/LookupBloomFilterBenchmark.java | 6 +-
.../main/java/org/apache/paimon/CoreOptions.java | 19 +++++-
.../compression/BlockCompressionFactory.java | 8 +--
.../apache/paimon/compression/CompressOptions.java | 76 ++++++++++++++++++++++
.../compression/ZstdBlockCompressionFactory.java | 8 ++-
.../paimon/compression/ZstdBlockCompressor.java | 8 ++-
.../apache/paimon/lookup/LookupStoreFactory.java | 4 +-
.../paimon/lookup/hash/HashLookupStoreFactory.java | 6 +-
.../paimon/lookup/sort/SortLookupStoreFactory.java | 3 +-
.../paimon/compression/BlockCompressionTest.java | 3 +-
.../lookup/hash/HashLookupStoreFactoryTest.java | 5 +-
.../lookup/sort/SortLookupStoreFactoryTest.java | 5 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 10 +--
.../paimon/crosspartition/GlobalIndexAssigner.java | 4 +-
.../org/apache/paimon/disk/ExternalBuffer.java | 3 +-
.../java/org/apache/paimon/disk/RowBuffer.java | 3 +-
.../org/apache/paimon/lookup/RocksDBState.java | 2 +-
.../org/apache/paimon/mergetree/MergeSorter.java | 5 +-
.../apache/paimon/mergetree/MergeTreeWriter.java | 5 +-
.../paimon/mergetree/SortBufferWriteBuffer.java | 3 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 5 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 2 +-
.../paimon/sort/BinaryExternalSortBuffer.java | 7 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 +-
.../org/apache/paimon/disk/ExternalBufferTest.java | 6 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 3 +-
.../paimon/mergetree/ContainsLevelsTest.java | 6 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 6 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 3 +-
.../mergetree/SortBufferWriteBufferTestBase.java | 3 +-
.../paimon/sort/BinaryExternalSortBufferTest.java | 3 +-
.../paimon/flink/sink/LocalMergeOperator.java | 2 +-
.../apache/paimon/flink/sorter/SortOperator.java | 5 +-
.../org/apache/paimon/flink/sorter/SortUtils.java | 2 +-
.../paimon/flink/sorter/SortOperatorTest.java | 5 +-
36 files changed, 205 insertions(+), 60 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 66bc354c9..508368559 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -647,12 +647,6 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Long</td>
<td>Optional snapshot id used in case of "from-snapshot" or
"from-snapshot-full" scan mode</td>
</tr>
- <tr>
- <td><h5>streaming.read.snapshot.delay</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>Duration</td>
- <td>The delay duration of stream read when scan incremental
snapshots.</td>
- </tr>
<tr>
<td><h5>scan.tag-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -780,6 +774,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>String</td>
<td>Compression for spill, currently zstd, lzo and zstd are
supported.</td>
</tr>
+ <tr>
+ <td><h5>spill-compression.zstd-level</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Default spill compression zstd level. For higher compression
rates, it can be configured to 9, but the read and write speed will
significantly decrease.</td>
+ </tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -792,6 +792,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Boolean</td>
<td>Whether to read the changes from overwrite in streaming mode.
Cannot be set to true when changelog producer is full-compaction or lookup
because it will read duplicated changes.</td>
</tr>
+ <tr>
+ <td><h5>streaming.read.snapshot.delay</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The delay duration of stream read when scan incremental
snapshots.</td>
+ </tr>
<tr>
<td><h5>tag.automatic-completion</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
index db237f28f..c15765d41 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.paimon.benchmark.lookup;
import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.lookup.hash.HashLookupStoreReader;
@@ -102,7 +103,10 @@ public class LookupBloomFilterBenchmark extends
AbstractLookupBenchmark {
Arrays.fill(value, (byte) 1);
HashLookupStoreFactory factory =
new HashLookupStoreFactory(
- new CacheManager(MemorySize.ofMebiBytes(10)), 16 *
1024, 0.75, "none");
+ new CacheManager(MemorySize.ofMebiBytes(10)),
+ 16 * 1024,
+ 0.75,
+ new CompressOptions("none", 1));
File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
HashLookupStoreWriter writer = factory.createWriter(file, filter);
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 21eee2a14..a9da36e7d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
@@ -357,6 +358,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Compression for spill, currently zstd, lzo and
zstd are supported.");
+ public static final ConfigOption<Integer> SPILL_COMPRESSION_ZSTD_LEVEL =
+ key("spill-compression.zstd-level")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Default spill compression zstd level. For higher
compression rates, it can be configured to 9, but the read and write speed will
significantly decrease.");
+
public static final ConfigOption<Boolean> WRITE_ONLY =
key("write-only")
.booleanType()
@@ -1638,8 +1646,15 @@ public class CoreOptions implements Serializable {
return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
}
- public String spillCompression() {
- return options.get(SPILL_COMPRESSION);
+ public CompressOptions spillCompressOptions() {
+ return new CompressOptions(
+ options.get(SPILL_COMPRESSION),
options.get(SPILL_COMPRESSION_ZSTD_LEVEL));
+ }
+
+ public CompressOptions lookupCompressOptions() {
+ return new CompressOptions(
+ options.get(LOOKUP_CACHE_SPILL_COMPRESSION),
+ options.get(SPILL_COMPRESSION_ZSTD_LEVEL));
}
public Duration continuousDiscoveryInterval() {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 3bf81a6c9..60dd1657b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -37,12 +37,12 @@ public interface BlockCompressionFactory {
/** Creates {@link BlockCompressionFactory} according to the
configuration. */
@Nullable
- static BlockCompressionFactory create(String compression) {
- switch (compression.toUpperCase()) {
+ static BlockCompressionFactory create(CompressOptions compression) {
+ switch (compression.compress().toUpperCase()) {
case "NONE":
return null;
case "ZSTD":
- return new ZstdBlockCompressionFactory();
+ return new
ZstdBlockCompressionFactory(compression.zstdLevel());
case "LZ4":
return new Lz4BlockCompressionFactory();
case "LZO":
@@ -60,7 +60,7 @@ public interface BlockCompressionFactory {
case NONE:
return null;
case ZSTD:
- return new ZstdBlockCompressionFactory();
+ return new ZstdBlockCompressionFactory(1);
case LZ4:
return new Lz4BlockCompressionFactory();
case LZO:
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
b/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
new file mode 100644
index 000000000..0db39a7ae
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/CompressOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.compression;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Options of compression. */
+public class CompressOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String compress;
+ private final int zstdLevel;
+
+ public CompressOptions(String compress, int zstdLevel) {
+ this.compress = compress;
+ this.zstdLevel = zstdLevel;
+ }
+
+ public String compress() {
+ return compress;
+ }
+
+ public int zstdLevel() {
+ return zstdLevel;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CompressOptions that = (CompressOptions) o;
+ return zstdLevel == that.zstdLevel && Objects.equals(compress,
that.compress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(compress, zstdLevel);
+ }
+
+ @Override
+ public String toString() {
+ return "CompressOptions{"
+ + "compress='"
+ + compress
+ + '\''
+ + ", zstdLevel="
+ + zstdLevel
+ + '}';
+ }
+
+ public static CompressOptions defaultOptions() {
+ return new CompressOptions("zstd", 1);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
index 13745e6ee..584f5ade7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
@@ -21,6 +21,12 @@ package org.apache.paimon.compression;
/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+ private final int compressLevel;
+
+ public ZstdBlockCompressionFactory(int compressLevel) {
+ this.compressLevel = compressLevel;
+ }
+
@Override
public BlockCompressionType getCompressionType() {
return BlockCompressionType.ZSTD;
@@ -28,7 +34,7 @@ public class ZstdBlockCompressionFactory implements
BlockCompressionFactory {
@Override
public BlockCompressor getCompressor() {
- return new ZstdBlockCompressor();
+ return new ZstdBlockCompressor(compressLevel);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
index bd6d2a5a6..2da979479 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressor.java
@@ -32,6 +32,12 @@ public class ZstdBlockCompressor implements BlockCompressor {
private static final int MAX_BLOCK_SIZE = 128 * 1024;
+ private final int level;
+
+ public ZstdBlockCompressor(int level) {
+ this.level = level;
+ }
+
@Override
public int getMaxCompressedSize(int srcSize) {
return HEADER_LENGTH + zstdMaxCompressedLength(srcSize);
@@ -51,7 +57,7 @@ public class ZstdBlockCompressor implements BlockCompressor {
throws BufferCompressionException {
ByteArrayOutputStream stream = new ByteArrayOutputStream(dst, dstOff);
try (ZstdOutputStream zstdStream =
- new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE, 1))
{
+ new ZstdOutputStream(stream, RecyclingBufferPool.INSTANCE,
level)) {
zstdStream.setWorkers(0);
zstdStream.write(src, srcOff, srcLen);
} catch (IOException e) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
index 2d57cf192..adee85916 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.lookup;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
@@ -66,8 +67,7 @@ public interface LookupStoreFactory {
static LookupStoreFactory create(
CoreOptions options, CacheManager cacheManager,
Comparator<MemorySlice> keyComparator) {
- String compression =
-
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION);
+ CompressOptions compression = options.lookupCompressOptions();
switch (options.lookupLocalFileType()) {
case SORT:
return new SortLookupStoreFactory(
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
index 7ece51734..1dbf715a1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.lookup.hash;
import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.utils.BloomFilter;
@@ -37,7 +38,10 @@ public class HashLookupStoreFactory implements
LookupStoreFactory {
@Nullable private final BlockCompressionFactory compressionFactory;
public HashLookupStoreFactory(
- CacheManager cacheManager, int cachePageSize, double loadFactor,
String compression) {
+ CacheManager cacheManager,
+ int cachePageSize,
+ double loadFactor,
+ CompressOptions compression) {
this.cacheManager = cacheManager;
this.cachePageSize = cachePageSize;
this.loadFactor = loadFactor;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
index 50b784688..62c4131e5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.memory.MemorySlice;
@@ -42,7 +43,7 @@ public class SortLookupStoreFactory implements
LookupStoreFactory {
Comparator<MemorySlice> comparator,
CacheManager cacheManager,
int blockSize,
- String compression) {
+ CompressOptions compression) {
this.comparator = comparator;
this.cacheManager = cacheManager;
this.blockSize = blockSize;
diff --git
a/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
b/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
index 5ece1cb03..1231794fb 100644
---
a/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/compression/BlockCompressionTest.java
@@ -36,7 +36,8 @@ class BlockCompressionTest {
@ParameterizedTest
@MethodSource("compressCodecGenerator")
void testBlockCompression(String compress) {
- BlockCompressionFactory factory =
BlockCompressionFactory.create(compress);
+ BlockCompressionFactory factory =
+ BlockCompressionFactory.create(new CompressOptions(compress,
1));
runTest(factory, 32768);
runTest(factory, 16);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
index b0dddd840..6444502f2 100644
---
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.lookup.hash;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.lookup.LookupStoreFactory.Context;
@@ -64,14 +65,14 @@ public class HashLookupStoreFactoryTest {
private final int pageSize = 1024;
private final boolean enableBloomFilter;
- private final String compress;
+ private final CompressOptions compress;
private File file;
private HashLookupStoreFactory factory;
public HashLookupStoreFactoryTest(List<Object> var) {
this.enableBloomFilter = (Boolean) var.get(0);
- this.compress = (String) var.get(1);
+ this.compress = new CompressOptions((String) var.get(1), 1);
}
@SuppressWarnings("unused")
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
index f8c84b858..c222877c8 100644
---
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.lookup.sort;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.io.cache.CacheManager;
@@ -66,7 +67,7 @@ public class SortLookupStoreFactoryTest {
Comparator.naturalOrder(),
new CacheManager(MemorySize.ofMebiBytes(1)),
1024,
- "zstd");
+ CompressOptions.defaultOptions());
SortLookupStoreWriter writer = factory.createWriter(file, null);
for (int i = 0; i < VALUE_COUNT; i++) {
@@ -94,7 +95,7 @@ public class SortLookupStoreFactoryTest {
keySerializer.createSliceComparator(),
new CacheManager(MemorySize.ofMebiBytes(1)),
64 * 1024,
- "zstd");
+ CompressOptions.defaultOptions());
SortLookupStoreWriter writer = factory.createWriter(file, null);
for (int i = 0; i < VALUE_COUNT; i++) {
byte[] bytes = toBytes(keySerializer, row, i);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index ee06945a0..402c6c1f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -21,6 +21,7 @@ package org.apache.paimon.append;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
@@ -81,7 +82,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
@Nullable private CompactDeletionFile compactDeletionFile;
private final LongCounter seqNumCounter;
private final String fileCompression;
- private final String spillCompression;
+ private final CompressOptions spillCompression;
private SinkWriter sinkWriter;
private final SimpleColStatsCollector.Factory[] statsCollectors;
@Nullable private final IOManager ioManager;
@@ -106,7 +107,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
- String spillCompression,
+ CompressOptions spillCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
FileIndexOptions fileIndexOptions,
@@ -456,11 +457,12 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
private final MemorySize maxDiskSize;
- private final String compression;
+ private final CompressOptions compression;
private RowBuffer writeBuffer;
- private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize,
String compression) {
+ private BufferedSinkWriter(
+ boolean spillable, MemorySize maxDiskSize, CompressOptions
compression) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
this.compression = compression;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 060e2428a..311b997a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -174,7 +174,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
new InternalRowSerializer(table.rowType()),
true,
coreOptions.writeBufferSpillDiskSize(),
- coreOptions.spillCompression());
+ coreOptions.spillCompressOptions());
}
public void bootstrapKey(InternalRow value) throws IOException {
@@ -305,7 +305,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
coreOptions.writeBufferSize() / 2,
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
- coreOptions.spillCompression(),
+ coreOptions.spillCompressOptions(),
coreOptions.writeBufferSpillDiskSize());
Function<SortOrder, RowIterator> iteratorFunction =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 34c082371..638b15280 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.disk;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
@@ -64,7 +65,7 @@ public class ExternalBuffer implements RowBuffer {
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize,
- String compression) {
+ CompressOptions compression) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 3c0a31cd6..7a0e3e2d4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -18,6 +18,7 @@
package org.apache.paimon.disk;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
@@ -60,7 +61,7 @@ public interface RowBuffer {
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable,
MemorySize maxDiskSize,
- String compression) {
+ CompressOptions compression) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer,
maxDiskSize, compression);
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index d3fafeeb6..25e58984e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -112,7 +112,7 @@ public abstract class RocksDBState<K, V, CacheV> {
options.writeBufferSize() / 2,
options.pageSize(),
options.localSortMaxNumFileHandles(),
- options.spillCompression(),
+ options.spillCompressOptions(),
options.writeBufferSpillDiskSize());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index f02cdaa3e..ae6db76b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -63,7 +64,7 @@ public class MergeSorter {
private final SortEngine sortEngine;
private final int spillThreshold;
- private final String compression;
+ private final CompressOptions compression;
private final MemorySegmentPool memoryPool;
@@ -76,7 +77,7 @@ public class MergeSorter {
@Nullable IOManager ioManager) {
this.sortEngine = options.sortEngine();
this.spillThreshold = options.sortSpillThreshold();
- this.compression = options.spillCompression();
+ this.compression = options.spillCompressOptions();
this.keyType = keyType;
this.valueType = valueType;
this.memoryPool =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index f124f3a40..de0a28c33 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.CompactIncrement;
@@ -59,7 +60,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private final boolean writeBufferSpillable;
private final MemorySize maxDiskSize;
private final int sortMaxFan;
- private final String sortCompression;
+ private final CompressOptions sortCompression;
private final IOManager ioManager;
private final RowType keyType;
@@ -89,7 +90,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
boolean writeBufferSpillable,
MemorySize maxDiskSize,
int sortMaxFan,
- String sortCompression,
+ CompressOptions sortCompression,
IOManager ioManager,
CompactManager compactManager,
long maxSequenceNumber,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 6406435a0..76c84fd4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -24,6 +24,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.NormalizedKeyComputer;
import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -69,7 +70,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
boolean spillable,
MemorySize maxDiskSize,
int sortMaxFan,
- String compression,
+ CompressOptions compression,
IOManager ioManager) {
this.keyType = keyType;
this.valueType = valueType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 5a1165aaf..203a9ff35 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.append.BucketedAppendCompactManager;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
@@ -79,7 +80,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
private final int compactionMaxFileNum;
private final boolean commitForceCompact;
private final String fileCompression;
- private final String spillCompression;
+ private final CompressOptions spillCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
@@ -123,7 +124,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
this.skipCompaction = options.writeOnly();
}
this.fileCompression = options.fileCompression();
- this.spillCompression = options.spillCompression();
+ this.spillCompression = options.spillCompressOptions();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index e0018a1ea..2ba523e85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -210,7 +210,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
bufferSpillable(),
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
- options.spillCompression(),
+ options.spillCompressOptions(),
ioManager,
compactManager,
restoredMaxSeqNumber,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index a6f059044..1ae453546 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -21,6 +21,7 @@ package org.apache.paimon.sort;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
@@ -68,7 +69,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
BinaryInMemorySortBuffer inMemorySortBuffer,
IOManager ioManager,
int maxNumFileHandles,
- String compression,
+ CompressOptions compression,
MemorySize maxDiskSize) {
this.serializer = serializer;
this.inMemorySortBuffer = inMemorySortBuffer;
@@ -99,7 +100,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
long bufferSize,
int pageSize,
int maxNumFileHandles,
- String compression,
+ CompressOptions compression,
MemorySize maxDiskSize) {
return create(
ioManager,
@@ -117,7 +118,7 @@ public class BinaryExternalSortBuffer implements SortBuffer
{
int[] keyFields,
MemorySegmentPool pool,
int maxNumFileHandles,
- String compression,
+ CompressOptions compression,
MemorySize maxDiskSize) {
RecordComparator comparator =
newRecordComparator(rowType.getFieldTypes(), keyFields);
BinaryInMemorySortBuffer sortBuffer =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 85385280e..436f8f584 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -625,7 +626,7 @@ public class AppendOnlyWriterTest {
useWriteBuffer,
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
- CoreOptions.SPILL_COMPRESSION.defaultValue(),
+ CompressOptions.defaultOptions(),
StatsCollectorFactories.createStatsFactories(
options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 3ea27cfa7..130cf32e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -18,7 +18,7 @@
package org.apache.paimon.disk;
-import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -67,7 +67,7 @@ public class ExternalBufferTest {
new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
this.serializer,
maxDiskSize,
- CoreOptions.SPILL_COMPRESSION.defaultValue());
+ CompressOptions.defaultOptions());
}
@Test
@@ -182,7 +182,7 @@ public class ExternalBufferTest {
new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE,
DEFAULT_PAGE_SIZE),
new BinaryRowSerializer(1),
MemorySize.MAX_VALUE,
- CoreOptions.SPILL_COMPRESSION.defaultValue());
+ CompressOptions.defaultOptions());
assertThatThrownBy(() ->
writeHuge(buffer)).isInstanceOf(IOException.class);
buffer.reset();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 13ac9736e..539ff4757 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.disk.IOManager;
@@ -87,7 +88,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
- CoreOptions.SPILL_COMPRESSION.defaultValue(),
+ CompressOptions.defaultOptions(),
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 2e6393460..0ab636c33 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -194,7 +195,10 @@ public class ContainsLevelsTest {
0, file.fileName(), file.fileSize(),
file.level()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
- new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
+ new CacheManager(MemorySize.ofMebiBytes(1)),
+ 2048,
+ 0.75,
+ new CompressOptions("none", 1)),
rowCount -> BloomFilter.builder(rowCount, 0.01),
LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 76094a520..2dce81ce5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -274,7 +275,10 @@ public class LookupLevelsTest {
0, file.fileName(), file.fileSize(),
file.level()),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(
- new CacheManager(MemorySize.ofMebiBytes(1)), 2048,
0.75, "none"),
+ new CacheManager(MemorySize.ofMebiBytes(1)),
+ 2048,
+ 0.75,
+ new CompressOptions("none", 1)),
rowCount -> BloomFilter.builder(rowCount, 0.05),
LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 6a6848ce1..80080516e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -514,7 +515,7 @@ public abstract class MergeTreeTestBase {
false,
MemorySize.ofKibiBytes(10),
128,
- "lz4",
+ CompressOptions.defaultOptions(),
null,
compactManager,
maxSequenceNumber,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index dea0268ec..27606ee33 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
@@ -71,7 +72,7 @@ public abstract class SortBufferWriteBufferTestBase {
false,
MemorySize.MAX_VALUE,
128,
- "lz4",
+ CompressOptions.defaultOptions(),
null);
protected abstract boolean addOnly();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
index 4c5cc12bf..59ef802a1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.sort;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
@@ -309,7 +310,7 @@ public class BinaryExternalSortBufferTest {
inMemorySortBuffer,
ioManager,
maxNumFileHandles,
- "lz4",
+ CompressOptions.defaultOptions(),
diskSize);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 273f38308..18d440dbc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -125,7 +125,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
false,
MemorySize.MAX_VALUE,
options.localSortMaxNumFileHandles(),
- options.spillCompression(),
+ options.spillCompressOptions(),
null);
currentWatermark = Long.MIN_VALUE;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index 331d1058d..52ad89689 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sorter;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -44,7 +45,7 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
private final int pageSize;
private final int arity;
private final int spillSortMaxNumFiles;
- private final String spillCompression;
+ private final CompressOptions spillCompression;
private final int sinkParallelism;
private final MemorySize maxDiskSize;
@@ -57,7 +58,7 @@ public class SortOperator extends
TableStreamOperator<InternalRow>
long maxMemory,
int pageSize,
int spillSortMaxNumFiles,
- String spillCompression,
+ CompressOptions spillCompression,
int sinkParallelism,
MemorySize maxDiskSize) {
this.keyType = keyType;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 79e9f1298..e163ac364 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -161,7 +161,7 @@ public class SortUtils {
options.writeBufferSize(),
options.pageSize(),
options.localSortMaxNumFileHandles(),
- options.spillCompression(),
+ options.spillCompressOptions(),
sinkParallelism,
options.writeBufferSpillDiskSize()))
.setParallelism(sinkParallelism)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
index 09db30579..155e259e0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -62,7 +63,7 @@ public class SortOperatorTest {
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128,
- "lz4",
+ CompressOptions.defaultOptions(),
1,
MemorySize.MAX_VALUE) {};
@@ -111,7 +112,7 @@ public class SortOperatorTest {
MemorySize.parse("10 mb").getBytes(),
(int) MemorySize.parse("16 kb").getBytes(),
128,
- "lz4",
+ CompressOptions.defaultOptions(),
1,
MemorySize.MAX_VALUE) {};
OneInputStreamOperatorTestHarness harness =
createTestHarness(sortOperator);