This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9965bf00f [core] Introduce file.compression (#914)
9965bf00f is described below

commit 9965bf00f25146d8c190b4aa4626dad1033c9838
Author: zhangmang <[email protected]>
AuthorDate: Mon Apr 17 09:35:51 2023 +0800

    [core] Introduce file.compression (#914)
---
 .../apache/paimon/format/FormatWriterFactory.java  |  8 +------
 .../paimon/format/FileStatsExtractorTestBase.java  |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 ++++++++++
 .../org/apache/paimon/append/AppendOnlyWriter.java |  8 +++++--
 .../paimon/io/KeyValueFileWriterFactory.java       | 27 +++++++++++++++++++---
 .../org/apache/paimon/io/RowDataFileWriter.java    | 12 ++++++++--
 .../apache/paimon/io/RowDataRollingFileWriter.java |  6 +++--
 .../org/apache/paimon/manifest/ManifestFile.java   | 11 ++++++---
 .../org/apache/paimon/manifest/ManifestList.java   |  4 +++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  9 ++++++--
 .../paimon/operation/KeyValueFileStoreWrite.java   | 12 ++++++++--
 .../java/org/apache/paimon/FileFormatTest.java     |  9 ++++++--
 .../apache/paimon/append/AppendOnlyWriterTest.java |  3 ++-
 .../apache/paimon/format/FileFormatSuffixTest.java |  4 +++-
 .../apache/paimon/format/FlushingFileFormat.java   |  7 +++++-
 .../apache/paimon/io/RollingFileWriterTest.java    |  3 ++-
 .../apache/paimon/format/BulkFileFormatTest.java   |  2 +-
 .../paimon/format/orc/OrcWriterFactoryTest.java    |  4 ++--
 .../format/parquet/ParquetReadWriteTest.java       |  2 +-
 19 files changed, 110 insertions(+), 35 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
index 0ac921ee6..517cc00cb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
@@ -20,8 +20,6 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.fs.PositionOutputStream;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /** A factory to create {@link FormatWriter} for file. */
@@ -35,9 +33,5 @@ public interface FormatWriterFactory {
      * @throws IOException Thrown if the writer cannot be opened, or if the 
output stream throws an
      *     exception.
      */
-    FormatWriter create(PositionOutputStream out, @Nullable String 
compression) throws IOException;
-
-    default FormatWriter create(PositionOutputStream out) throws IOException {
-        return create(out, null);
-    }
+    FormatWriter create(PositionOutputStream out, String compression) throws 
IOException;
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
 
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
index 727cc1b69..6839670de 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
@@ -74,7 +74,7 @@ public abstract class FileStatsExtractorTestBase {
         FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
         Path path = new Path(tempDir.toString() + "/test");
         PositionOutputStream out = new LocalFileIO().newOutputStream(path, 
false);
-        FormatWriter writer = writerFactory.create(out);
+        FormatWriter writer = writerFactory.create(out, "LZ4");
 
         List<GenericRow> data = createData(rowType);
         for (GenericRow row : data) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index c27d8b26a..b1554e5d2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -113,6 +113,14 @@ public class CoreOptions implements Serializable {
                                     + "could be NONE, ZLIB, SNAPPY, LZO, LZ4, 
for parquet file format, the compression value could be "
                                     + "UNCOMPRESSED, SNAPPY, GZIP, LZO, 
BROTLI, LZ4, ZSTD.");
 
+    public static final ConfigOption<String> FILE_COMPRESSION =
+            key("file.compression")
+                    .stringType()
+                    .defaultValue("LZ4")
+                    .withDescription(
+                            "Default file compression format, can be 
overridden by "
+                                    + FILE_COMPRESSION_PER_LEVEL.key());
+
     public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
             key("manifest.format")
                     .enumType(FileFormatType.class)
@@ -673,6 +681,10 @@ public class CoreOptions implements Serializable {
                 .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), 
Map.Entry::getValue));
     }
 
+    public String fileCompression() {
+        return options.get(FILE_COMPRESSION);
+    }
+
     public int snapshotNumRetainMin() {
         return options.get(SNAPSHOT_NUM_RETAINED_MIN);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9c2601c99..b40c038c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -61,6 +61,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
+    private final String fileCompression;
 
     private RowDataRollingFileWriter writer;
 
@@ -74,7 +75,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             CompactManager compactManager,
             boolean forceCompact,
             DataFilePathFactory pathFactory,
-            @Nullable CommitIncrement increment) {
+            @Nullable CommitIncrement increment,
+            String fileCompression) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -87,6 +89,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
+        this.fileCompression = fileCompression;
 
         this.writer = createRollingRowWriter();
 
@@ -169,7 +172,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
                 targetFileSize,
                 writeSchema,
                 pathFactory,
-                seqNumCounter);
+                seqNumCounter,
+                fileCompression);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index c0bd2f0f9..2ea50eede 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializer;
 import org.apache.paimon.annotation.VisibleForTesting;
@@ -46,6 +47,7 @@ public class KeyValueFileWriterFactory {
     private final DataFilePathFactory pathFactory;
     private final long suggestedFileSize;
     private final Map<Integer, String> levelCompressions;
+    private final String fileCompression;
 
     private KeyValueFileWriterFactory(
             FileIO fileIO,
@@ -56,7 +58,8 @@ public class KeyValueFileWriterFactory {
             @Nullable FileStatsExtractor fileStatsExtractor,
             DataFilePathFactory pathFactory,
             long suggestedFileSize,
-            Map<Integer, String> levelCompressions) {
+            Map<Integer, String> levelCompressions,
+            String fileCompression) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.keyType = keyType;
@@ -66,6 +69,7 @@ public class KeyValueFileWriterFactory {
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
         this.levelCompressions = levelCompressions;
+        this.fileCompression = fileCompression;
     }
 
     public RowType keyType() {
@@ -88,7 +92,11 @@ public class KeyValueFileWriterFactory {
     }
 
     private String getCompression(int level) {
-        return null == levelCompressions ? null : levelCompressions.get(level);
+        if (null == levelCompressions) {
+            return fileCompression;
+        } else {
+            return levelCompressions.getOrDefault(level, fileCompression);
+        }
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingChangelogFileWriter(int level) {
@@ -160,6 +168,18 @@ public class KeyValueFileWriterFactory {
 
         public KeyValueFileWriterFactory build(
                 BinaryRow partition, int bucket, Map<Integer, String> 
levelCompressions) {
+            return build(
+                    partition,
+                    bucket,
+                    levelCompressions,
+                    CoreOptions.FILE_COMPRESSION.defaultValue());
+        }
+
+        public KeyValueFileWriterFactory build(
+                BinaryRow partition,
+                int bucket,
+                Map<Integer, String> levelCompressions,
+                String fileCompression) {
             RowType recordType = KeyValue.schema(keyType, valueType);
             return new KeyValueFileWriterFactory(
                     fileIO,
@@ -170,7 +190,8 @@ public class KeyValueFileWriterFactory {
                     fileFormat.createStatsExtractor(recordType).orElse(null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     suggestedFileSize,
-                    levelCompressions);
+                    levelCompressions,
+                    fileCompression);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 22afd08a6..471e044b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -51,8 +51,16 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             RowType writeSchema,
             @Nullable FileStatsExtractor fileStatsExtractor,
             long schemaId,
-            LongCounter seqNumCounter) {
-        super(fileIO, factory, path, Function.identity(), writeSchema, 
fileStatsExtractor, null);
+            LongCounter seqNumCounter,
+            String fileCompression) {
+        super(
+                fileIO,
+                factory,
+                path,
+                Function.identity(),
+                writeSchema,
+                fileStatsExtractor,
+                fileCompression);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index c5e549ac4..7b17ed167 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -35,7 +35,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             long targetFileSize,
             RowType writeSchema,
             DataFilePathFactory pathFactory,
-            LongCounter seqNumCounter) {
+            LongCounter seqNumCounter,
+            String fileCompression) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -45,7 +46,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 writeSchema,
                                 
fileFormat.createStatsExtractor(writeSchema).orElse(null),
                                 schemaId,
-                                seqNumCounter),
+                                seqNumCounter,
+                                fileCompression),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e5ea62794..fbaec0dae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FieldStatsCollector;
 import org.apache.paimon.format.FileFormat;
@@ -82,7 +83,11 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
         RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
                 new RollingFileWriter<>(
-                        () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newPath()),
+                        () ->
+                                new ManifestEntryWriter(
+                                        writerFactory,
+                                        pathFactory.newPath(),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
                         suggestedFileSize);
         try {
             writer.write(entries);
@@ -102,8 +107,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private long numDeletedFiles = 0;
         private long schemaId = Long.MIN_VALUE;
 
-        ManifestEntryWriter(FormatWriterFactory factory, Path path) {
-            super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
null);
+        ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
+            super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
fileCompression);
 
             this.partitionStatsCollector = new 
FieldStatsCollector(partitionType);
             this.partitionStatsSerializer = new 
FieldStatsArraySerializer(partitionType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 86529e31c..c11536713 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
@@ -64,7 +65,8 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
         Path path = pathFactory.newPath();
         try {
             try (PositionOutputStream out = fileIO.newOutputStream(path, 
false)) {
-                FormatWriter writer = writerFactory.create(out);
+                FormatWriter writer =
+                        writerFactory.create(out, 
CoreOptions.FILE_COMPRESSION.defaultValue());
                 try {
                     for (ManifestFileMeta record : metas) {
                         writer.addElement(serializer.toRow(record));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 52f0fc2e9..f5f071d23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -64,6 +64,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
     private final boolean commitForceCompact;
     private final boolean skipCompaction;
     private final boolean assertDisorder;
+    private final String fileCompression;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
@@ -88,6 +89,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
         this.commitForceCompact = options.commitForceCompact();
         this.skipCompaction = options.writeOnly();
         this.assertDisorder = 
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
+        this.fileCompression = options.fileCompression();
     }
 
     @Override
@@ -112,6 +114,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                                 targetFileSize,
                                 compactRewriter(partition, bucket),
                                 assertDisorder);
+
         return new AppendOnlyWriter(
                 fileIO,
                 schemaId,
@@ -122,7 +125,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                 compactManager,
                 commitForceCompact,
                 factory,
-                restoreIncrement);
+                restoreIncrement,
+                fileCompression);
     }
 
     private AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -139,7 +143,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                             targetFileSize,
                             rowType,
                             pathFactory.createDataFilePathFactory(partition, 
bucket),
-                            new 
LongCounter(toCompact.get(0).minSequenceNumber()));
+                            new 
LongCounter(toCompact.get(0).minSequenceNumber()),
+                            fileCompression);
             rewriter.write(
                     new RecordReaderIterator<>(
                             read.createReader(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index aade98209..72af8fb57 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -135,7 +135,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         }
 
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
+                writerFactoryBuilder.build(
+                        partition,
+                        bucket,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         UniversalCompaction universalCompaction =
@@ -194,7 +198,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator, Levels levels) {
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
+                writerFactoryBuilder.build(
+                        partition,
+                        bucket,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         switch (options.changelogProducer()) {
             case FULL_COMPACTION:
                 return new FullChangelogMergeTreeCompactRewriter(
diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java 
b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
index 83a869ffb..8554dc025 100644
--- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
@@ -59,7 +59,9 @@ public class FileFormatTest {
         expected.add(GenericRow.of(2, 22));
         expected.add(GenericRow.of(3, 33));
         PositionOutputStream out = LocalFileIO.create().newOutputStream(path, 
false);
-        FormatWriter writer = avro.createWriterFactory(rowType).create(out);
+        FormatWriter writer =
+                avro.createWriterFactory(rowType)
+                        .create(out, 
CoreOptions.FILE_COMPRESSION.defaultValue());
         for (InternalRow row : expected) {
             writer.addElement(row);
         }
@@ -83,7 +85,10 @@ public class FileFormatTest {
         Path path = new Path(tempDir.toUri().toString(), "1.avro");
         Assertions.assertThrows(
                 RuntimeException.class,
-                () -> 
writerFactory.create(LocalFileIO.create().newOutputStream(path, false)),
+                () ->
+                        writerFactory.create(
+                                LocalFileIO.create().newOutputStream(path, 
false),
+                                CoreOptions.FILE_COMPRESSION.defaultValue()),
                 "Unrecognized codec: _unsupported");
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5ac63cb9d..9c2913c7e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -316,7 +316,8 @@ public class AppendOnlyWriterTest {
                         compactManager,
                         forceCompact,
                         pathFactory,
-                        null);
+                        null,
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         return Pair.of(writer, compactManager.allFiles());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 60eb362c6..40e814844 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendOnlyCompactManager;
 import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.data.BinaryString;
@@ -72,7 +73,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                                 null, toCompact, 4, 10, 10, null, false), // 
not used
                         false,
                         dataFilePathFactory,
-                        null);
+                        null,
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
         CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
index 548bd8b51..d2f1829cf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -47,7 +48,11 @@ public class FlushingFileFormat extends FileFormat {
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
         return (PositionOutputStream, level) -> {
-            FormatWriter wrapped = 
format.createWriterFactory(type).create(PositionOutputStream);
+            FormatWriter wrapped =
+                    format.createWriterFactory(type)
+                            .create(
+                                    PositionOutputStream,
+                                    
CoreOptions.FILE_COMPRESSION.defaultValue());
             return new FormatWriter() {
                 @Override
                 public void addElement(InternalRow rowData) throws IOException 
{
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index cfd3300f5..f19f036d2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -75,7 +75,8 @@ public class RollingFileWriterTest {
                                         SCHEMA,
                                         
fileFormat.createStatsExtractor(SCHEMA).orElse(null),
                                         0L,
-                                        new LongCounter(0)),
+                                        new LongCounter(0),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
                         TARGET_FILE_SIZE);
     }
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java 
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
index bf2d02803..ccd9d2c7d 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
@@ -71,7 +71,7 @@ public class BulkFileFormatTest {
         expected.add(GenericRow.of(2, 2));
         expected.add(GenericRow.of(3, 3));
         PositionOutputStream out = new LocalFileIO().newOutputStream(path, 
false);
-        FormatWriter writer = 
fileFormat.createWriterFactory(rowType).create(out);
+        FormatWriter writer = 
fileFormat.createWriterFactory(rowType).create(out, "LZ4");
         for (InternalRow row : expected) {
             writer.addElement(row);
         }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
index 11c60a381..2511d7ed7 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
@@ -50,8 +50,8 @@ class OrcWriterFactoryTest {
                                 "struct<_col0:string,_col1:int>",
                                 new DataType[] {DataTypes.STRING(), 
DataTypes.INT()}),
                         memoryManager);
-        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()));
-        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()));
+        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");
+        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()), "LZ4");
 
         List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
         assertThat(addedWriterPath).hasSize(2);
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 245e13f7f..260264229 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -297,7 +297,7 @@ public class ParquetReadWriteTest {
         conf.setInteger("parquet.block.size", rowGroupSize);
         ParquetWriterFactory factory =
                 new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, 
conf));
-        FormatWriter writer = factory.create(new 
LocalFileIO().newOutputStream(path, false));
+        FormatWriter writer = factory.create(new 
LocalFileIO().newOutputStream(path, false), "LZ4");
         for (InternalRow row : rows) {
             writer.addElement(row);
         }

Reply via email to