This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 36c0b91ce [core] Introduce file.compression (#1023)
36c0b91ce is described below
commit 36c0b91cef21c4d9bb563fcf6cdc6c3ed6ad0581
Author: zhangmang <[email protected]>
AuthorDate: Wed Apr 26 17:30:49 2023 +0800
[core] Introduce file.compression (#1023)
---
.../shortcodes/generated/core_configuration.html | 6 ++++++
.../org/apache/paimon/format/FormatWriterFactory.java | 4 ----
.../paimon/format/FileStatsExtractorTestBase.java | 4 +++-
.../src/main/java/org/apache/paimon/CoreOptions.java | 12 ++++++++++++
.../org/apache/paimon/append/AppendOnlyWriter.java | 8 ++++++--
.../apache/paimon/io/KeyValueFileWriterFactory.java | 19 +++++++++++++++----
.../java/org/apache/paimon/io/RowDataFileWriter.java | 12 ++++++++++--
.../apache/paimon/io/RowDataRollingFileWriter.java | 6 ++++--
.../java/org/apache/paimon/manifest/ManifestFile.java | 11 ++++++++---
.../java/org/apache/paimon/manifest/ManifestList.java | 4 +++-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 9 +++++++--
.../paimon/operation/KeyValueFileStoreWrite.java | 12 ++++++++++--
.../test/java/org/apache/paimon/FileFormatTest.java | 9 +++++++--
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 ++-
.../apache/paimon/format/FileFormatSuffixTest.java | 4 +++-
.../org/apache/paimon/format/FlushingFileFormat.java | 7 ++++++-
.../apache/paimon/io/KeyValueFileReadWriteTest.java | 2 +-
.../org/apache/paimon/io/RollingFileWriterTest.java | 3 ++-
.../org/apache/paimon/mergetree/LookupLevelsTest.java | 2 +-
.../org/apache/paimon/mergetree/MergeTreeTest.java | 10 ++++++++--
.../org/apache/paimon/format/BulkFileFormatTest.java | 2 +-
.../paimon/format/orc/OrcFileStatsExtractorTest.java | 5 +++++
.../paimon/format/orc/OrcWriterFactoryTest.java | 4 ++--
.../format/parquet/ParquetFileStatsExtractorTest.java | 5 +++++
.../paimon/format/parquet/ParquetReadWriteTest.java | 3 ++-
25 files changed, 129 insertions(+), 37 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b6b91d807..1931791e7 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -80,6 +80,12 @@
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a
partitioned table with dynamic partition columns. Works only when the table has
partition keys.</td>
</tr>
+ <tr>
+ <td><h5>file.compression</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Default file compression format, can be overridden by
file.compression.per.level</td>
+ </tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
<td style="word-wrap: break-word;"></td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
index 0ac921ee6..e53be8003 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java
@@ -36,8 +36,4 @@ public interface FormatWriterFactory {
* exception.
*/
FormatWriter create(PositionOutputStream out, @Nullable String
compression) throws IOException;
-
- default FormatWriter create(PositionOutputStream out) throws IOException {
- return create(out, null);
- }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
index 727cc1b69..3d545fa6d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java
@@ -74,7 +74,7 @@ public abstract class FileStatsExtractorTestBase {
FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
Path path = new Path(tempDir.toString() + "/test");
PositionOutputStream out = new LocalFileIO().newOutputStream(path,
false);
- FormatWriter writer = writerFactory.create(out);
+ FormatWriter writer = writerFactory.create(out, fileCompression());
List<GenericRow> data = createData(rowType);
for (GenericRow row : data) {
@@ -251,4 +251,6 @@ public abstract class FileStatsExtractorTestBase {
protected abstract FileFormat createFormat();
protected abstract RowType rowType();
+
+ protected abstract String fileCompression();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index ce8ccca63..e034402eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -113,6 +113,14 @@ public class CoreOptions implements Serializable {
+ "could be NONE, ZLIB, SNAPPY, LZO, LZ4,
for parquet file format, the compression value could be "
+ "UNCOMPRESSED, SNAPPY, GZIP, LZO,
BROTLI, LZ4, ZSTD.");
+ public static final ConfigOption<String> FILE_COMPRESSION =
+ key("file.compression")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Default file compression format, can be
overridden by "
+ + FILE_COMPRESSION_PER_LEVEL.key());
+
public static final ConfigOption<FileFormatType> MANIFEST_FORMAT =
key("manifest.format")
.enumType(FileFormatType.class)
@@ -674,6 +682,10 @@ public class CoreOptions implements Serializable {
.collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()),
Map.Entry::getValue));
}
+ public String fileCompression() {
+ return options.get(FILE_COMPRESSION);
+ }
+
public int snapshotNumRetainMin() {
return options.get(SNAPSHOT_NUM_RETAINED_MIN);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9c2601c99..b40c038c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -61,6 +61,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
+ private final String fileCompression;
private RowDataRollingFileWriter writer;
@@ -74,7 +75,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
CompactManager compactManager,
boolean forceCompact,
DataFilePathFactory pathFactory,
- @Nullable CommitIncrement increment) {
+ @Nullable CommitIncrement increment,
+ String fileCompression) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -87,6 +89,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
+ this.fileCompression = fileCompression;
this.writer = createRollingRowWriter();
@@ -169,7 +172,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow> {
targetFileSize,
writeSchema,
pathFactory,
- seqNumCounter);
+ seqNumCounter,
+ fileCompression);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index c0bd2f0f9..1c2086f95 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -46,6 +46,7 @@ public class KeyValueFileWriterFactory {
private final DataFilePathFactory pathFactory;
private final long suggestedFileSize;
private final Map<Integer, String> levelCompressions;
+ private final String fileCompression;
private KeyValueFileWriterFactory(
FileIO fileIO,
@@ -56,7 +57,8 @@ public class KeyValueFileWriterFactory {
@Nullable FileStatsExtractor fileStatsExtractor,
DataFilePathFactory pathFactory,
long suggestedFileSize,
- Map<Integer, String> levelCompressions) {
+ Map<Integer, String> levelCompressions,
+ String fileCompression) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.keyType = keyType;
@@ -66,6 +68,7 @@ public class KeyValueFileWriterFactory {
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
this.levelCompressions = levelCompressions;
+ this.fileCompression = fileCompression;
}
public RowType keyType() {
@@ -88,7 +91,11 @@ public class KeyValueFileWriterFactory {
}
private String getCompression(int level) {
- return null == levelCompressions ? null : levelCompressions.get(level);
+ if (null == levelCompressions) {
+ return fileCompression;
+ } else {
+ return levelCompressions.getOrDefault(level, fileCompression);
+ }
}
public RollingFileWriter<KeyValue, DataFileMeta>
createRollingChangelogFileWriter(int level) {
@@ -159,7 +166,10 @@ public class KeyValueFileWriterFactory {
}
public KeyValueFileWriterFactory build(
- BinaryRow partition, int bucket, Map<Integer, String>
levelCompressions) {
+ BinaryRow partition,
+ int bucket,
+ Map<Integer, String> levelCompressions,
+ String fileCompression) {
RowType recordType = KeyValue.schema(keyType, valueType);
return new KeyValueFileWriterFactory(
fileIO,
@@ -170,7 +180,8 @@ public class KeyValueFileWriterFactory {
fileFormat.createStatsExtractor(recordType).orElse(null),
pathFactory.createDataFilePathFactory(partition, bucket),
suggestedFileSize,
- levelCompressions);
+ levelCompressions,
+ fileCompression);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 22afd08a6..471e044b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -51,8 +51,16 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
RowType writeSchema,
@Nullable FileStatsExtractor fileStatsExtractor,
long schemaId,
- LongCounter seqNumCounter) {
- super(fileIO, factory, path, Function.identity(), writeSchema,
fileStatsExtractor, null);
+ LongCounter seqNumCounter,
+ String fileCompression) {
+ super(
+ fileIO,
+ factory,
+ path,
+ Function.identity(),
+ writeSchema,
+ fileStatsExtractor,
+ fileCompression);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index c5e549ac4..7b17ed167 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -35,7 +35,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
long targetFileSize,
RowType writeSchema,
DataFilePathFactory pathFactory,
- LongCounter seqNumCounter) {
+ LongCounter seqNumCounter,
+ String fileCompression) {
super(
() ->
new RowDataFileWriter(
@@ -45,7 +46,8 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
writeSchema,
fileFormat.createStatsExtractor(writeSchema).orElse(null),
schemaId,
- seqNumCounter),
+ seqNumCounter,
+ fileCompression),
targetFileSize);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e5ea62794..fbaec0dae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -18,6 +18,7 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FieldStatsCollector;
import org.apache.paimon.format.FileFormat;
@@ -82,7 +83,11 @@ public class ManifestFile extends ObjectsFile<ManifestEntry>
{
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
new RollingFileWriter<>(
- () -> new ManifestEntryWriter(writerFactory,
pathFactory.newPath()),
+ () ->
+ new ManifestEntryWriter(
+ writerFactory,
+ pathFactory.newPath(),
+
CoreOptions.FILE_COMPRESSION.defaultValue()),
suggestedFileSize);
try {
writer.write(entries);
@@ -102,8 +107,8 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private long numDeletedFiles = 0;
private long schemaId = Long.MIN_VALUE;
- ManifestEntryWriter(FormatWriterFactory factory, Path path) {
- super(ManifestFile.this.fileIO, factory, path, serializer::toRow,
null);
+ ManifestEntryWriter(FormatWriterFactory factory, Path path, String
fileCompression) {
+ super(ManifestFile.this.fileIO, factory, path, serializer::toRow,
fileCompression);
this.partitionStatsCollector = new
FieldStatsCollector(partitionType);
this.partitionStatsSerializer = new
FieldStatsArraySerializer(partitionType);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index 86529e31c..c11536713 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -18,6 +18,7 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
@@ -64,7 +65,8 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
Path path = pathFactory.newPath();
try {
try (PositionOutputStream out = fileIO.newOutputStream(path,
false)) {
- FormatWriter writer = writerFactory.create(out);
+ FormatWriter writer =
+ writerFactory.create(out,
CoreOptions.FILE_COMPRESSION.defaultValue());
try {
for (ManifestFileMeta record : metas) {
writer.addElement(serializer.toRow(record));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 52f0fc2e9..f5f071d23 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -64,6 +64,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
private final boolean commitForceCompact;
private final boolean skipCompaction;
private final boolean assertDisorder;
+ private final String fileCompression;
public AppendOnlyFileStoreWrite(
FileIO fileIO,
@@ -88,6 +89,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.assertDisorder =
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
+ this.fileCompression = options.fileCompression();
}
@Override
@@ -112,6 +114,7 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
targetFileSize,
compactRewriter(partition, bucket),
assertDisorder);
+
return new AppendOnlyWriter(
fileIO,
schemaId,
@@ -122,7 +125,8 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
compactManager,
commitForceCompact,
factory,
- restoreIncrement);
+ restoreIncrement,
+ fileCompression);
}
private AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -139,7 +143,8 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<InternalRow
targetFileSize,
rowType,
pathFactory.createDataFilePathFactory(partition,
bucket),
- new
LongCounter(toCompact.get(0).minSequenceNumber()));
+ new
LongCounter(toCompact.get(0).minSequenceNumber()),
+ fileCompression);
rewriter.write(
new RecordReaderIterator<>(
read.createReader(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index aade98209..72af8fb57 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -135,7 +135,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
KeyValueFileWriterFactory writerFactory =
- writerFactoryBuilder.build(partition, bucket,
options.fileCompressionPerLevel());
+ writerFactoryBuilder.build(
+ partition,
+ bucket,
+ options.fileCompressionPerLevel(),
+ options.fileCompression());
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
UniversalCompaction universalCompaction =
@@ -194,7 +198,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
BinaryRow partition, int bucket, Comparator<InternalRow>
keyComparator, Levels levels) {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
- writerFactoryBuilder.build(partition, bucket,
options.fileCompressionPerLevel());
+ writerFactoryBuilder.build(
+ partition,
+ bucket,
+ options.fileCompressionPerLevel(),
+ options.fileCompression());
switch (options.changelogProducer()) {
case FULL_COMPACTION:
return new FullChangelogMergeTreeCompactRewriter(
diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
index 83a869ffb..8554dc025 100644
--- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java
@@ -59,7 +59,9 @@ public class FileFormatTest {
expected.add(GenericRow.of(2, 22));
expected.add(GenericRow.of(3, 33));
PositionOutputStream out = LocalFileIO.create().newOutputStream(path,
false);
- FormatWriter writer = avro.createWriterFactory(rowType).create(out);
+ FormatWriter writer =
+ avro.createWriterFactory(rowType)
+ .create(out,
CoreOptions.FILE_COMPRESSION.defaultValue());
for (InternalRow row : expected) {
writer.addElement(row);
}
@@ -83,7 +85,10 @@ public class FileFormatTest {
Path path = new Path(tempDir.toUri().toString(), "1.avro");
Assertions.assertThrows(
RuntimeException.class,
- () ->
writerFactory.create(LocalFileIO.create().newOutputStream(path, false)),
+ () ->
+ writerFactory.create(
+ LocalFileIO.create().newOutputStream(path,
false),
+ CoreOptions.FILE_COMPRESSION.defaultValue()),
"Unrecognized codec: _unsupported");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5ac63cb9d..9c2913c7e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -316,7 +316,8 @@ public class AppendOnlyWriterTest {
compactManager,
forceCompact,
pathFactory,
- null);
+ null,
+ CoreOptions.FILE_COMPRESSION.defaultValue());
return Pair.of(writer, compactManager.allFiles());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 60eb362c6..40e814844 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyCompactManager;
import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.data.BinaryString;
@@ -72,7 +73,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
null, toCompact, 4, 10, 10, null, false), //
not used
false,
dataFilePathFactory,
- null);
+ null,
+ CoreOptions.FILE_COMPRESSION.defaultValue());
appendOnlyWriter.write(
GenericRow.of(1, BinaryString.fromString("aaa"),
BinaryString.fromString("1")));
CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
index 548bd8b51..d2f1829cf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -47,7 +48,11 @@ public class FlushingFileFormat extends FileFormat {
@Override
public FormatWriterFactory createWriterFactory(RowType type) {
return (PositionOutputStream, level) -> {
- FormatWriter wrapped =
format.createWriterFactory(type).create(PositionOutputStream);
+ FormatWriter wrapped =
+ format.createWriterFactory(type)
+ .create(
+ PositionOutputStream,
+
CoreOptions.FILE_COMPRESSION.defaultValue());
return new FormatWriter() {
@Override
public void addElement(InternalRow rowData) throws IOException
{
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 373be796f..2a236c966 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -255,7 +255,7 @@ public class KeyValueFileReadWriteTest {
new FlushingFileFormat(format),
pathFactory,
suggestedFileSize)
- .build(BinaryRow.EMPTY_ROW, 0, null);
+ .build(BinaryRow.EMPTY_ROW, 0, null, null);
}
private KeyValueFileReaderFactory createReaderFactory(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index cfd3300f5..f19f036d2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -75,7 +75,8 @@ public class RollingFileWriterTest {
SCHEMA,
fileFormat.createStatsExtractor(SCHEMA).orElse(null),
0L,
- new LongCounter(0)),
+ new LongCounter(0),
+
CoreOptions.FILE_COMPRESSION.defaultValue()),
TARGET_FILE_SIZE);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 44b95c51c..e4b074a51 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -234,7 +234,7 @@ public class LookupLevelsTest {
new FlushingFileFormat("avro"),
new FileStorePathFactory(path),
TARGET_FILE_SIZE.defaultValue().getBytes())
- .build(BinaryRow.EMPTY_ROW, 0, null);
+ .build(BinaryRow.EMPTY_ROW, 0, null, null);
}
private KeyValueFileReaderFactory createReaderFactory() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
index 7f43997c2..3a2ce21cc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java
@@ -177,10 +177,16 @@ public class MergeTreeTest {
options.targetFileSize());
writerFactory =
writerFactoryBuilder.build(
- BinaryRow.EMPTY_ROW, 0,
options.fileCompressionPerLevel());
+ BinaryRow.EMPTY_ROW,
+ 0,
+ options.fileCompressionPerLevel(),
+ options.fileCompression());
compactWriterFactory =
writerFactoryBuilder.build(
- BinaryRow.EMPTY_ROW, 0,
options.fileCompressionPerLevel());
+ BinaryRow.EMPTY_ROW,
+ 0,
+ options.fileCompressionPerLevel(),
+ options.fileCompression());
writer = createMergeTreeWriter(Collections.emptyList());
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
index bf2d02803..ccd9d2c7d 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java
@@ -71,7 +71,7 @@ public class BulkFileFormatTest {
expected.add(GenericRow.of(2, 2));
expected.add(GenericRow.of(3, 3));
PositionOutputStream out = new LocalFileIO().newOutputStream(path,
false);
- FormatWriter writer =
fileFormat.createWriterFactory(rowType).create(out);
+ FormatWriter writer =
fileFormat.createWriterFactory(rowType).create(out, "LZ4");
for (InternalRow row : expected) {
writer.addElement(row);
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
index aab9e6eef..e3f983bee 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java
@@ -79,4 +79,9 @@ public class OrcFileStatsExtractorTest extends
FileStatsExtractorTestBase {
new MultisetType(new VarCharType(8)))
.build();
}
+
+ @Override
+ protected String fileCompression() {
+ return "LZ4";
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
index 11c60a381..2511d7ed7 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java
@@ -50,8 +50,8 @@ class OrcWriterFactoryTest {
"struct<_col0:string,_col1:int>",
new DataType[] {DataTypes.STRING(),
DataTypes.INT()}),
memoryManager);
- factory.create(new
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()));
- factory.create(new
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()));
+ factory.create(new
LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4");
+ factory.create(new
LocalPositionOutputStream(tmpDir.resolve("file2").toFile()), "LZ4");
List<Path> addedWriterPath = memoryManager.getAddedWriterPath();
assertThat(addedWriterPath).hasSize(2);
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
index c108fc9cf..34f2abbd2 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java
@@ -95,4 +95,9 @@ public class ParquetFileStatsExtractorTest extends
FileStatsExtractorTestBase {
}
return stats;
}
+
+ @Override
+ protected String fileCompression() {
+ return "SNAPPY";
+ }
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 245e13f7f..429614366 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -297,7 +297,8 @@ public class ParquetReadWriteTest {
conf.setInteger("parquet.block.size", rowGroupSize);
ParquetWriterFactory factory =
new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE,
conf));
- FormatWriter writer = factory.create(new
LocalFileIO().newOutputStream(path, false));
+ FormatWriter writer =
+ factory.create(new LocalFileIO().newOutputStream(path, false),
"SNAPPY");
for (InternalRow row : rows) {
writer.addElement(row);
}