This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c0b91ce [core] Introduce file.compression (#1023)
36c0b91ce is described below

commit 36c0b91cef21c4d9bb563fcf6cdc6c3ed6ad0581
Author: zhangmang <[email protected]>
AuthorDate: Wed Apr 26 17:30:49 2023 +0800

    [core] Introduce file.compression (#1023)
---
 .../shortcodes/generated/core_configuration.html      |  6 ++++++
 .../org/apache/paimon/format/FormatWriterFactory.java |  4 ----
 .../paimon/format/FileStatsExtractorTestBase.java     |  4 +++-
 .../src/main/java/org/apache/paimon/CoreOptions.java  | 12 ++++++++++++
 .../org/apache/paimon/append/AppendOnlyWriter.java    |  8 ++++++--
 .../apache/paimon/io/KeyValueFileWriterFactory.java   | 19 +++++++++++++++----
 .../java/org/apache/paimon/io/RowDataFileWriter.java  | 12 ++++++++++--
 .../apache/paimon/io/RowDataRollingFileWriter.java    |  6 ++++--
 .../java/org/apache/paimon/manifest/ManifestFile.java | 11 ++++++++---
 .../java/org/apache/paimon/manifest/ManifestList.java |  4 +++-
 .../paimon/operation/AppendOnlyFileStoreWrite.java    |  9 +++++++--
 .../paimon/operation/KeyValueFileStoreWrite.java      | 12 ++++++++++--
 .../test/java/org/apache/paimon/FileFormatTest.java   |  9 +++++++--
 .../apache/paimon/append/AppendOnlyWriterTest.java    |  3 ++-
 .../apache/paimon/format/FileFormatSuffixTest.java    |  4 +++-
 .../org/apache/paimon/format/FlushingFileFormat.java  |  7 ++++++-
 .../apache/paimon/io/KeyValueFileReadWriteTest.java   |  2 +-
 .../org/apache/paimon/io/RollingFileWriterTest.java   |  3 ++-
 .../org/apache/paimon/mergetree/LookupLevelsTest.java |  2 +-
 .../org/apache/paimon/mergetree/MergeTreeTest.java    | 10 ++++++++--
 .../org/apache/paimon/format/BulkFileFormatTest.java  |  2 +-
 .../paimon/format/orc/OrcFileStatsExtractorTest.java  |  5 +++++
 .../paimon/format/orc/OrcWriterFactoryTest.java       |  4 ++--
 .../format/parquet/ParquetFileStatsExtractorTest.java |  5 +++++
 .../paimon/format/parquet/ParquetReadWriteTest.java   |  3 ++-
 25 files changed, 129 insertions(+), 37 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b6b91d807..1931791e7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -80,6 +80,12 @@
             <td>Boolean</td>
             <td>Whether only overwrite dynamic partition when overwriting a 
partitioned table with dynamic partition columns. Works only when the table has 
partition keys.</td>
         </tr>
+        <tr>
+            <td><h5>file.compression</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Default file compression format, can be overridden by 
file.compression.per.level</td>
+        </tr>
         <tr>
             <td><h5>file.compression.per.level</h5></td>
             <td style="word-wrap: break-word;"></td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
index 0ac921ee6..e53be8003 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
@@ -36,8 +36,4 @@ public interface FormatWriterFactory {
      *     exception.
      */
     FormatWriter create(PositionOutputStream out, @Nullable String 
compression) throws IOException;
-
-    default FormatWriter create(PositionOutputStream out) throws IOException {
-        return create(out, null);
-    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
 
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
index 727cc1b69..3d545fa6d 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
@@ -74,7 +74,7 @@ public abstract class FileStatsExtractorTestBase {
         FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
         Path path = new Path(tempDir.toString() + "/test");
         PositionOutputStream out = new LocalFileIO().newOutputStream(path, 
false);
-        FormatWriter writer = writerFactory.create(out);
+        FormatWriter writer = writerFactory.create(out, fileCompression());
 
         List<GenericRow> data = createData(rowType);
         for (GenericRow row : data) {
@@ -251,4 +251,6 @@ public abstract class FileStatsExtractorTestBase {
     protected abstract FileFormat createFormat();
 
     protected abstract RowType rowType();
+
+    protected abstract String fileCompression();
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ce8ccca63..e034402eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -113,6 +113,14 @@ public class CoreOptions implements Serializable {
                                     + "could be NONE, ZLIB, SNAPPY, LZO, LZ4, 
for parquet file format, the compression value could be "
                                     + "UNCOMPRESSED, SNAPPY, GZIP, LZO, 
BROTLI, LZ4, ZSTD.");
 
+    public static final ConfigOption<String> FILE_COMPRESSION =
+            key("file.compression")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Default file compression format, can be 
overridden by "
+                                    + FILE_COMPRESSION_PER_LEVEL.key());
+
     public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
             key("manifest.format")
                     .enumType(FileFormatType.class)
@@ -674,6 +682,10 @@ public class CoreOptions implements Serializable {
                 .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), 
Map.Entry::getValue));
     }
 
+    public String fileCompression() {
+        return options.get(FILE_COMPRESSION);
+    }
+
     public int snapshotNumRetainMin() {
         return options.get(SNAPSHOT_NUM_RETAINED_MIN);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9c2601c99..b40c038c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -61,6 +61,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
+    private final String fileCompression;
 
     private RowDataRollingFileWriter writer;
 
@@ -74,7 +75,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             CompactManager compactManager,
             boolean forceCompact,
             DataFilePathFactory pathFactory,
-            @Nullable CommitIncrement increment) {
+            @Nullable CommitIncrement increment,
+            String fileCompression) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -87,6 +89,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
+        this.fileCompression = fileCompression;
 
         this.writer = createRollingRowWriter();
 
@@ -169,7 +172,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
                 targetFileSize,
                 writeSchema,
                 pathFactory,
-                seqNumCounter);
+                seqNumCounter,
+                fileCompression);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index c0bd2f0f9..1c2086f95 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -46,6 +46,7 @@ public class KeyValueFileWriterFactory {
     private final DataFilePathFactory pathFactory;
     private final long suggestedFileSize;
     private final Map<Integer, String> levelCompressions;
+    private final String fileCompression;
 
     private KeyValueFileWriterFactory(
             FileIO fileIO,
@@ -56,7 +57,8 @@ public class KeyValueFileWriterFactory {
             @Nullable FileStatsExtractor fileStatsExtractor,
             DataFilePathFactory pathFactory,
             long suggestedFileSize,
-            Map<Integer, String> levelCompressions) {
+            Map<Integer, String> levelCompressions,
+            String fileCompression) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.keyType = keyType;
@@ -66,6 +68,7 @@ public class KeyValueFileWriterFactory {
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
         this.levelCompressions = levelCompressions;
+        this.fileCompression = fileCompression;
     }
 
     public RowType keyType() {
@@ -88,7 +91,11 @@ public class KeyValueFileWriterFactory {
     }
 
     private String getCompression(int level) {
-        return null == levelCompressions ? null : levelCompressions.get(level);
+        if (null == levelCompressions) {
+            return fileCompression;
+        } else {
+            return levelCompressions.getOrDefault(level, fileCompression);
+        }
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingChangelogFileWriter(int level) {
@@ -159,7 +166,10 @@ public class KeyValueFileWriterFactory {
         }
 
         public KeyValueFileWriterFactory build(
-                BinaryRow partition, int bucket, Map<Integer, String> 
levelCompressions) {
+                BinaryRow partition,
+                int bucket,
+                Map<Integer, String> levelCompressions,
+                String fileCompression) {
             RowType recordType = KeyValue.schema(keyType, valueType);
             return new KeyValueFileWriterFactory(
                     fileIO,
@@ -170,7 +180,8 @@ public class KeyValueFileWriterFactory {
                     fileFormat.createStatsExtractor(recordType).orElse(null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     suggestedFileSize,
-                    levelCompressions);
+                    levelCompressions,
+                    fileCompression);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 22afd08a6..471e044b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -51,8 +51,16 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             RowType writeSchema,
             @Nullable FileStatsExtractor fileStatsExtractor,
             long schemaId,
-            LongCounter seqNumCounter) {
-        super(fileIO, factory, path, Function.identity(), writeSchema, 
fileStatsExtractor, null);
+            LongCounter seqNumCounter,
+            String fileCompression) {
+        super(
+                fileIO,
+                factory,
+                path,
+                Function.identity(),
+                writeSchema,
+                fileStatsExtractor,
+                fileCompression);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index c5e549ac4..7b17ed167 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -35,7 +35,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             long targetFileSize,
             RowType writeSchema,
             DataFilePathFactory pathFactory,
-            LongCounter seqNumCounter) {
+            LongCounter seqNumCounter,
+            String fileCompression) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -45,7 +46,8 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 writeSchema,
                                 
fileFormat.createStatsExtractor(writeSchema).orElse(null),
                                 schemaId,
-                                seqNumCounter),
+                                seqNumCounter,
+                                fileCompression),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e5ea62794..fbaec0dae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FieldStatsCollector;
 import org.apache.paimon.format.FileFormat;
@@ -82,7 +83,11 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> 
{
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
         RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
                 new RollingFileWriter<>(
-                        () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newPath()),
+                        () ->
+                                new ManifestEntryWriter(
+                                        writerFactory,
+                                        pathFactory.newPath(),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
                         suggestedFileSize);
         try {
             writer.write(entries);
@@ -102,8 +107,8 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private long numDeletedFiles = 0;
         private long schemaId = Long.MIN_VALUE;
 
-        ManifestEntryWriter(FormatWriterFactory factory, Path path) {
-            super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
null);
+        ManifestEntryWriter(FormatWriterFactory factory, Path path, String 
fileCompression) {
+            super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
fileCompression);
 
             this.partitionStatsCollector = new 
FieldStatsCollector(partitionType);
             this.partitionStatsSerializer = new 
FieldStatsArraySerializer(partitionType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 86529e31c..c11536713 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
@@ -64,7 +65,8 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
         Path path = pathFactory.newPath();
         try {
             try (PositionOutputStream out = fileIO.newOutputStream(path, 
false)) {
-                FormatWriter writer = writerFactory.create(out);
+                FormatWriter writer =
+                        writerFactory.create(out, 
CoreOptions.FILE_COMPRESSION.defaultValue());
                 try {
                     for (ManifestFileMeta record : metas) {
                         writer.addElement(serializer.toRow(record));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 52f0fc2e9..f5f071d23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -64,6 +64,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
     private final boolean commitForceCompact;
     private final boolean skipCompaction;
     private final boolean assertDisorder;
+    private final String fileCompression;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
@@ -88,6 +89,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
         this.commitForceCompact = options.commitForceCompact();
         this.skipCompaction = options.writeOnly();
         this.assertDisorder = 
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
+        this.fileCompression = options.fileCompression();
     }
 
     @Override
@@ -112,6 +114,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                                 targetFileSize,
                                 compactRewriter(partition, bucket),
                                 assertDisorder);
+
         return new AppendOnlyWriter(
                 fileIO,
                 schemaId,
@@ -122,7 +125,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                 compactManager,
                 commitForceCompact,
                 factory,
-                restoreIncrement);
+                restoreIncrement,
+                fileCompression);
     }
 
     private AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -139,7 +143,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                             targetFileSize,
                             rowType,
                             pathFactory.createDataFilePathFactory(partition, 
bucket),
-                            new 
LongCounter(toCompact.get(0).minSequenceNumber()));
+                            new 
LongCounter(toCompact.get(0).minSequenceNumber()),
+                            fileCompression);
             rewriter.write(
                     new RecordReaderIterator<>(
                             read.createReader(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index aade98209..72af8fb57 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -135,7 +135,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         }
 
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
+                writerFactoryBuilder.build(
+                        partition,
+                        bucket,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         UniversalCompaction universalCompaction =
@@ -194,7 +198,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator, Levels levels) {
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(partition, bucket, 
options.fileCompressionPerLevel());
+                writerFactoryBuilder.build(
+                        partition,
+                        bucket,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         switch (options.changelogProducer()) {
             case FULL_COMPACTION:
                 return new FullChangelogMergeTreeCompactRewriter(
diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java 
b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
index 83a869ffb..8554dc025 100644
--- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
@@ -59,7 +59,9 @@ public class FileFormatTest {
         expected.add(GenericRow.of(2, 22));
         expected.add(GenericRow.of(3, 33));
         PositionOutputStream out = LocalFileIO.create().newOutputStream(path, 
false);
-        FormatWriter writer = avro.createWriterFactory(rowType).create(out);
+        FormatWriter writer =
+                avro.createWriterFactory(rowType)
+                        .create(out, 
CoreOptions.FILE_COMPRESSION.defaultValue());
         for (InternalRow row : expected) {
             writer.addElement(row);
         }
@@ -83,7 +85,10 @@ public class FileFormatTest {
         Path path = new Path(tempDir.toUri().toString(), "1.avro");
         Assertions.assertThrows(
                 RuntimeException.class,
-                () -> 
writerFactory.create(LocalFileIO.create().newOutputStream(path, false)),
+                () ->
+                        writerFactory.create(
+                                LocalFileIO.create().newOutputStream(path, 
false),
+                                CoreOptions.FILE_COMPRESSION.defaultValue()),
                 "Unrecognized codec: _unsupported");
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5ac63cb9d..9c2913c7e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -316,7 +316,8 @@ public class AppendOnlyWriterTest {
                         compactManager,
                         forceCompact,
                         pathFactory,
-                        null);
+                        null,
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         return Pair.of(writer, compactManager.allFiles());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 60eb362c6..40e814844 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendOnlyCompactManager;
 import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.data.BinaryString;
@@ -72,7 +73,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                                 null, toCompact, 4, 10, 10, null, false), // 
not used
                         false,
                         dataFilePathFactory,
-                        null);
+                        null,
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
         CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
index 548bd8b51..d2f1829cf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -47,7 +48,11 @@ public class FlushingFileFormat extends FileFormat {
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
         return (PositionOutputStream, level) -> {
-            FormatWriter wrapped = 
format.createWriterFactory(type).create(PositionOutputStream);
+            FormatWriter wrapped =
+                    format.createWriterFactory(type)
+                            .create(
+                                    PositionOutputStream,
+                                    
CoreOptions.FILE_COMPRESSION.defaultValue());
             return new FormatWriter() {
                 @Override
                 public void addElement(InternalRow rowData) throws IOException 
{
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 373be796f..2a236c966 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -255,7 +255,7 @@ public class KeyValueFileReadWriteTest {
                         new FlushingFileFormat(format),
                         pathFactory,
                         suggestedFileSize)
-                .build(BinaryRow.EMPTY_ROW, 0, null);
+                .build(BinaryRow.EMPTY_ROW, 0, null, null);
     }
 
     private KeyValueFileReaderFactory createReaderFactory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index cfd3300f5..f19f036d2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -75,7 +75,8 @@ public class RollingFileWriterTest {
                                         SCHEMA,
                                         
fileFormat.createStatsExtractor(SCHEMA).orElse(null),
                                         0L,
-                                        new LongCounter(0)),
+                                        new LongCounter(0),
+                                        
CoreOptions.FILE_COMPRESSION.defaultValue()),
                         TARGET_FILE_SIZE);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 44b95c51c..e4b074a51 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -234,7 +234,7 @@ public class LookupLevelsTest {
                         new FlushingFileFormat("avro"),
                         new FileStorePathFactory(path),
                         TARGET_FILE_SIZE.defaultValue().getBytes())
-                .build(BinaryRow.EMPTY_ROW, 0, null);
+                .build(BinaryRow.EMPTY_ROW, 0, null, null);
     }
 
     private KeyValueFileReaderFactory createReaderFactory() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
index 7f43997c2..3a2ce21cc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
@@ -177,10 +177,16 @@ public class MergeTreeTest {
                         options.targetFileSize());
         writerFactory =
                 writerFactoryBuilder.build(
-                        BinaryRow.EMPTY_ROW, 0, 
options.fileCompressionPerLevel());
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         compactWriterFactory =
                 writerFactoryBuilder.build(
-                        BinaryRow.EMPTY_ROW, 0, 
options.fileCompressionPerLevel());
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        options.fileCompressionPerLevel(),
+                        options.fileCompression());
         writer = createMergeTreeWriter(Collections.emptyList());
     }
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java 
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
index bf2d02803..ccd9d2c7d 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
@@ -71,7 +71,7 @@ public class BulkFileFormatTest {
         expected.add(GenericRow.of(2, 2));
         expected.add(GenericRow.of(3, 3));
         PositionOutputStream out = new LocalFileIO().newOutputStream(path, 
false);
-        FormatWriter writer = 
fileFormat.createWriterFactory(rowType).create(out);
+        FormatWriter writer = 
fileFormat.createWriterFactory(rowType).create(out, "LZ4");
         for (InternalRow row : expected) {
             writer.addElement(row);
         }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
index aab9e6eef..e3f983bee 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
@@ -79,4 +79,9 @@ public class OrcFileStatsExtractorTest extends 
FileStatsExtractorTestBase {
                         new MultisetType(new VarCharType(8)))
                 .build();
     }
+
+    @Override
+    protected String fileCompression() {
+        return "LZ4";
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
index 11c60a381..2511d7ed7 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
@@ -50,8 +50,8 @@ class OrcWriterFactoryTest {
                                 "struct<_col0:string,_col1:int>",
                                 new DataType[] {DataTypes.STRING(), 
DataTypes.INT()}),
                         memoryManager);
-        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()));
-        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()));
+        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");
+        factory.create(new 
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()), "LZ4");
 
         List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
         assertThat(addedWriterPath).hasSize(2);
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
index c108fc9cf..34f2abbd2 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
@@ -95,4 +95,9 @@ public class ParquetFileStatsExtractorTest extends 
FileStatsExtractorTestBase {
         }
         return stats;
     }
+
+    @Override
+    protected String fileCompression() {
+        return "SNAPPY";
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 245e13f7f..429614366 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -297,7 +297,8 @@ public class ParquetReadWriteTest {
         conf.setInteger("parquet.block.size", rowGroupSize);
         ParquetWriterFactory factory =
                 new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, 
conf));
-        FormatWriter writer = factory.create(new 
LocalFileIO().newOutputStream(path, false));
+        FormatWriter writer =
+                factory.create(new LocalFileIO().newOutputStream(path, false), 
"SNAPPY");
         for (InternalRow row : rows) {
             writer.addElement(row);
         }

Reply via email to