This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 984892b168a1027eb342c8aeb060a93674865b1e Author: zhangmang <[email protected]> AuthorDate: Wed Apr 26 17:30:49 2023 +0800 [core] Introduce file.compression (#1023) --- .../shortcodes/generated/core_configuration.html | 6 ++++++ .../org/apache/paimon/format/FormatWriterFactory.java | 4 ---- .../paimon/format/FileStatsExtractorTestBase.java | 4 +++- .../src/main/java/org/apache/paimon/CoreOptions.java | 12 ++++++++++++ .../org/apache/paimon/append/AppendOnlyWriter.java | 8 ++++++-- .../apache/paimon/io/KeyValueFileWriterFactory.java | 19 +++++++++++++++---- .../java/org/apache/paimon/io/RowDataFileWriter.java | 12 ++++++++++-- .../apache/paimon/io/RowDataRollingFileWriter.java | 6 ++++-- .../java/org/apache/paimon/manifest/ManifestFile.java | 11 ++++++++--- .../java/org/apache/paimon/manifest/ManifestList.java | 4 +++- .../paimon/operation/AppendOnlyFileStoreWrite.java | 9 +++++++-- .../paimon/operation/KeyValueFileStoreWrite.java | 12 ++++++++++-- .../test/java/org/apache/paimon/FileFormatTest.java | 9 +++++++-- .../apache/paimon/append/AppendOnlyWriterTest.java | 3 ++- .../apache/paimon/format/FileFormatSuffixTest.java | 4 +++- .../org/apache/paimon/format/FlushingFileFormat.java | 7 ++++++- .../apache/paimon/io/KeyValueFileReadWriteTest.java | 2 +- .../org/apache/paimon/io/RollingFileWriterTest.java | 3 ++- .../org/apache/paimon/mergetree/LookupLevelsTest.java | 2 +- .../org/apache/paimon/mergetree/MergeTreeTest.java | 10 ++++++++-- .../org/apache/paimon/format/BulkFileFormatTest.java | 2 +- .../paimon/format/orc/OrcFileStatsExtractorTest.java | 5 +++++ .../paimon/format/orc/OrcWriterFactoryTest.java | 4 ++-- .../format/parquet/ParquetFileStatsExtractorTest.java | 5 +++++ .../paimon/format/parquet/ParquetReadWriteTest.java | 3 ++- 25 files changed, 129 insertions(+), 37 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index b6b91d807..1931791e7 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -80,6 +80,12 @@ <td>Boolean</td> <td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td> </tr> + <tr> + <td><h5>file.compression</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Default file compression format, can be overridden by file.compression.per.level</td> + </tr> <tr> <td><h5>file.compression.per.level</h5></td> <td style="word-wrap: break-word;"></td> diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java index 0ac921ee6..e53be8003 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriterFactory.java @@ -36,8 +36,4 @@ public interface FormatWriterFactory { * exception. */ FormatWriter create(PositionOutputStream out, @Nullable String compression) throws IOException; - - default FormatWriter create(PositionOutputStream out) throws IOException { - return create(out, null); - } } diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java index 727cc1b69..3d545fa6d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java +++ b/paimon-common/src/test/java/org/apache/paimon/format/FileStatsExtractorTestBase.java @@ -74,7 +74,7 @@ public abstract class FileStatsExtractorTestBase { FormatWriterFactory writerFactory = format.createWriterFactory(rowType); Path path = new Path(tempDir.toString() + "/test"); PositionOutputStream out = new LocalFileIO().newOutputStream(path, false); - FormatWriter writer = writerFactory.create(out); + FormatWriter writer = writerFactory.create(out, fileCompression()); List<GenericRow> data = createData(rowType); for (GenericRow row : data) { @@ -251,4 +251,6 @@ public abstract class FileStatsExtractorTestBase { protected abstract FileFormat createFormat(); protected abstract RowType rowType(); + + protected abstract String fileCompression(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index ce8ccca63..e034402eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -113,6 +113,14 @@ public class CoreOptions implements Serializable { + "could be NONE, ZLIB, SNAPPY, LZO, LZ4, for parquet file format, the compression value could be " + "UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD."); + public static final ConfigOption<String> FILE_COMPRESSION = + key("file.compression") + .stringType() + .noDefaultValue() + .withDescription( + "Default file compression format, can be overridden by " + + FILE_COMPRESSION_PER_LEVEL.key()); + public static final ConfigOption<FileFormatType> MANIFEST_FORMAT = key("manifest.format") .enumType(FileFormatType.class) @@ -674,6 +682,10 @@ public class CoreOptions implements Serializable { .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue)); } + public String fileCompression() { + return options.get(FILE_COMPRESSION); + } + public int snapshotNumRetainMin() { return options.get(SNAPSHOT_NUM_RETAINED_MIN); } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 9c2601c99..b40c038c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -61,6 +61,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { private final List<DataFileMeta> compactBefore; private final List<DataFileMeta> compactAfter; private final LongCounter seqNumCounter; + private final String fileCompression; private RowDataRollingFileWriter writer; @@ -74,7 +75,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { CompactManager compactManager, boolean forceCompact, DataFilePathFactory pathFactory, - @Nullable CommitIncrement increment) { + @Nullable CommitIncrement increment, + String fileCompression) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -87,6 +89,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { this.compactBefore = new ArrayList<>(); this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); + this.fileCompression = fileCompression; this.writer = createRollingRowWriter(); @@ -169,7 +172,8 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow> { targetFileSize, writeSchema, pathFactory, - seqNumCounter); + seqNumCounter, + fileCompression); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index c0bd2f0f9..1c2086f95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -46,6 +46,7 @@ public class KeyValueFileWriterFactory { private final DataFilePathFactory pathFactory; private final long suggestedFileSize; private final Map<Integer, String> levelCompressions; + private final String fileCompression; private KeyValueFileWriterFactory( FileIO fileIO, @@ -56,7 +57,8 @@ public class KeyValueFileWriterFactory { @Nullable FileStatsExtractor fileStatsExtractor, DataFilePathFactory pathFactory, long suggestedFileSize, - Map<Integer, String> levelCompressions) { + Map<Integer, String> levelCompressions, + String fileCompression) { this.fileIO = fileIO; this.schemaId = schemaId; this.keyType = keyType; @@ -66,6 +68,7 @@ public class KeyValueFileWriterFactory { this.pathFactory = pathFactory; this.suggestedFileSize = suggestedFileSize; this.levelCompressions = levelCompressions; + this.fileCompression = fileCompression; } public RowType keyType() { @@ -88,7 +91,11 @@ public class KeyValueFileWriterFactory { } private String getCompression(int level) { - return null == levelCompressions ? null : levelCompressions.get(level); + if (null == levelCompressions) { + return fileCompression; + } else { + return levelCompressions.getOrDefault(level, fileCompression); + } } public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) { @@ -159,7 +166,10 @@ public class KeyValueFileWriterFactory { } public KeyValueFileWriterFactory build( - BinaryRow partition, int bucket, Map<Integer, String> levelCompressions) { + BinaryRow partition, + int bucket, + Map<Integer, String> levelCompressions, + String fileCompression) { RowType recordType = KeyValue.schema(keyType, valueType); return new KeyValueFileWriterFactory( fileIO, @@ -170,7 +180,8 @@ public class KeyValueFileWriterFactory { fileFormat.createStatsExtractor(recordType).orElse(null), pathFactory.createDataFilePathFactory(partition, bucket), suggestedFileSize, - levelCompressions); + levelCompressions, + fileCompression); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 22afd08a6..471e044b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -51,8 +51,16 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter<InternalR RowType writeSchema, @Nullable FileStatsExtractor fileStatsExtractor, long schemaId, - LongCounter seqNumCounter) { - super(fileIO, factory, path, Function.identity(), writeSchema, fileStatsExtractor, null); + LongCounter seqNumCounter, + String fileCompression) { + super( + fileIO, + factory, + path, + Function.identity(), + writeSchema, + fileStatsExtractor, + fileCompression); this.schemaId = schemaId; this.seqNumCounter = seqNumCounter; this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java index c5e549ac4..7b17ed167 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java @@ -35,7 +35,8 @@ public class RowDataRollingFileWriter extends RollingFileWriter<InternalRow, Dat long targetFileSize, RowType writeSchema, DataFilePathFactory pathFactory, - LongCounter seqNumCounter) { + LongCounter seqNumCounter, + String fileCompression) { super( () -> new RowDataFileWriter( @@ -45,7 +46,8 @@ public class RowDataRollingFileWriter extends RollingFileWriter<InternalRow, Dat writeSchema, fileFormat.createStatsExtractor(writeSchema).orElse(null), schemaId, - seqNumCounter), + seqNumCounter, + fileCompression), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index e5ea62794..fbaec0dae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -18,6 +18,7 @@ package org.apache.paimon.manifest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.format.FieldStatsCollector; import org.apache.paimon.format.FileFormat; @@ -82,7 +83,11 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> { public List<ManifestFileMeta> write(List<ManifestEntry> entries) { RollingFileWriter<ManifestEntry, ManifestFileMeta> writer = new RollingFileWriter<>( - () -> new ManifestEntryWriter(writerFactory, pathFactory.newPath()), + () -> + new ManifestEntryWriter( + writerFactory, + pathFactory.newPath(), + CoreOptions.FILE_COMPRESSION.defaultValue()), suggestedFileSize); try { writer.write(entries); @@ -102,8 +107,8 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> { private long numDeletedFiles = 0; private long schemaId = Long.MIN_VALUE; - ManifestEntryWriter(FormatWriterFactory factory, Path path) { - super(ManifestFile.this.fileIO, factory, path, serializer::toRow, null); + ManifestEntryWriter(FormatWriterFactory factory, Path path, String fileCompression) { + super(ManifestFile.this.fileIO, factory, path, serializer::toRow, fileCompression); this.partitionStatsCollector = new FieldStatsCollector(partitionType); this.partitionStatsSerializer = new FieldStatsArraySerializer(partitionType); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java index 86529e31c..c11536713 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java @@ -18,6 +18,7 @@ package org.apache.paimon.manifest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.FormatWriter; @@ -64,7 +65,8 @@ public class ManifestList extends ObjectsFile<ManifestFileMeta> { Path path = pathFactory.newPath(); try { try (PositionOutputStream out = fileIO.newOutputStream(path, false)) { - FormatWriter writer = writerFactory.create(out); + FormatWriter writer = + writerFactory.create(out, CoreOptions.FILE_COMPRESSION.defaultValue()); try { for (ManifestFileMeta record : metas) { writer.addElement(serializer.toRow(record)); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 52f0fc2e9..f5f071d23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -64,6 +64,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow private final boolean commitForceCompact; private final boolean skipCompaction; private final boolean assertDisorder; + private final String fileCompression; public AppendOnlyFileStoreWrite( FileIO fileIO, @@ -88,6 +89,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow this.commitForceCompact = options.commitForceCompact(); this.skipCompaction = options.writeOnly(); this.assertDisorder = options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER); + this.fileCompression = options.fileCompression(); } @Override @@ -112,6 +114,7 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow targetFileSize, compactRewriter(partition, bucket), assertDisorder); + return new AppendOnlyWriter( fileIO, schemaId, @@ -122,7 +125,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow compactManager, commitForceCompact, factory, - restoreIncrement); + restoreIncrement, + fileCompression); } private AppendOnlyCompactManager.CompactRewriter compactRewriter( @@ -139,7 +143,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<InternalRow targetFileSize, rowType, pathFactory.createDataFilePathFactory(partition, bucket), - new LongCounter(toCompact.get(0).minSequenceNumber())); + new LongCounter(toCompact.get(0).minSequenceNumber()), + fileCompression); rewriter.write( new RecordReaderIterator<>( read.createReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index aade98209..72af8fb57 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -135,7 +135,11 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { } KeyValueFileWriterFactory writerFactory = - writerFactoryBuilder.build(partition, bucket, options.fileCompressionPerLevel()); + writerFactoryBuilder.build( + partition, + bucket, + options.fileCompressionPerLevel(), + options.fileCompression()); Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); UniversalCompaction universalCompaction = @@ -194,7 +198,11 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, Levels levels) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = - writerFactoryBuilder.build(partition, bucket, options.fileCompressionPerLevel()); + writerFactoryBuilder.build( + partition, + bucket, + options.fileCompressionPerLevel(), + options.fileCompression()); switch (options.changelogProducer()) { case FULL_COMPACTION: return new FullChangelogMergeTreeCompactRewriter( diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java index 83a869ffb..8554dc025 100644 --- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java @@ -59,7 +59,9 @@ public class FileFormatTest { expected.add(GenericRow.of(2, 22)); expected.add(GenericRow.of(3, 33)); PositionOutputStream out = LocalFileIO.create().newOutputStream(path, false); - FormatWriter writer = avro.createWriterFactory(rowType).create(out); + FormatWriter writer = + avro.createWriterFactory(rowType) + .create(out, CoreOptions.FILE_COMPRESSION.defaultValue()); for (InternalRow row : expected) { writer.addElement(row); } @@ -83,7 +85,10 @@ public class FileFormatTest { Path path = new Path(tempDir.toUri().toString(), "1.avro"); Assertions.assertThrows( RuntimeException.class, - () -> writerFactory.create(LocalFileIO.create().newOutputStream(path, false)), + () -> + writerFactory.create( + LocalFileIO.create().newOutputStream(path, false), + CoreOptions.FILE_COMPRESSION.defaultValue()), "Unrecognized codec: _unsupported"); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 5ac63cb9d..9c2913c7e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -316,7 +316,8 @@ public class AppendOnlyWriterTest { compactManager, forceCompact, pathFactory, - null); + null, + CoreOptions.FILE_COMPRESSION.defaultValue()); return Pair.of(writer, compactManager.allFiles()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 60eb362c6..40e814844 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.format; +import org.apache.paimon.CoreOptions; import org.apache.paimon.append.AppendOnlyCompactManager; import org.apache.paimon.append.AppendOnlyWriter; import org.apache.paimon.data.BinaryString; @@ -72,7 +73,8 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest { null, toCompact, 4, 10, 10, null, false), // not used false, dataFilePathFactory, - null); + null, + CoreOptions.FILE_COMPRESSION.defaultValue()); appendOnlyWriter.write( GenericRow.of(1, BinaryString.fromString("aaa"), BinaryString.fromString("1"))); CommitIncrement increment = appendOnlyWriter.prepareCommit(true); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java index 548bd8b51..d2f1829cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FlushingFileFormat.java @@ -18,6 +18,7 @@ package org.apache.paimon.format; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -47,7 +48,11 @@ public class FlushingFileFormat extends FileFormat { @Override public FormatWriterFactory createWriterFactory(RowType type) { return (PositionOutputStream, level) -> { - FormatWriter wrapped = format.createWriterFactory(type).create(PositionOutputStream); + FormatWriter wrapped = + format.createWriterFactory(type) + .create( + PositionOutputStream, + CoreOptions.FILE_COMPRESSION.defaultValue()); return new FormatWriter() { @Override public void addElement(InternalRow rowData) throws IOException { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 373be796f..2a236c966 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -255,7 +255,7 @@ public class KeyValueFileReadWriteTest { new FlushingFileFormat(format), pathFactory, suggestedFileSize) - .build(BinaryRow.EMPTY_ROW, 0, null); + .build(BinaryRow.EMPTY_ROW, 0, null, null); } private KeyValueFileReaderFactory createReaderFactory( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index cfd3300f5..f19f036d2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -75,7 +75,8 @@ public class RollingFileWriterTest { SCHEMA, fileFormat.createStatsExtractor(SCHEMA).orElse(null), 0L, - new LongCounter(0)), + new LongCounter(0), + CoreOptions.FILE_COMPRESSION.defaultValue()), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 44b95c51c..e4b074a51 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -234,7 +234,7 @@ public class LookupLevelsTest { new FlushingFileFormat("avro"), new FileStorePathFactory(path), TARGET_FILE_SIZE.defaultValue().getBytes()) - .build(BinaryRow.EMPTY_ROW, 0, null); + .build(BinaryRow.EMPTY_ROW, 0, null, null); } private KeyValueFileReaderFactory createReaderFactory() { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java index 7f43997c2..3a2ce21cc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTest.java @@ -177,10 +177,16 @@ public class MergeTreeTest { options.targetFileSize()); writerFactory = writerFactoryBuilder.build( - BinaryRow.EMPTY_ROW, 0, options.fileCompressionPerLevel()); + BinaryRow.EMPTY_ROW, + 0, + options.fileCompressionPerLevel(), + options.fileCompression()); compactWriterFactory = writerFactoryBuilder.build( - BinaryRow.EMPTY_ROW, 0, options.fileCompressionPerLevel()); + BinaryRow.EMPTY_ROW, + 0, + options.fileCompressionPerLevel(), + options.fileCompression()); writer = createMergeTreeWriter(Collections.emptyList()); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java index bf2d02803..ccd9d2c7d 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/BulkFileFormatTest.java @@ -71,7 +71,7 @@ public class BulkFileFormatTest { expected.add(GenericRow.of(2, 2)); expected.add(GenericRow.of(3, 3)); PositionOutputStream out = new LocalFileIO().newOutputStream(path, false); - FormatWriter writer = fileFormat.createWriterFactory(rowType).create(out); + FormatWriter writer = fileFormat.createWriterFactory(rowType).create(out, "LZ4"); for (InternalRow row : expected) { writer.addElement(row); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java index aab9e6eef..e3f983bee 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileStatsExtractorTest.java @@ -79,4 +79,9 @@ public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase { new MultisetType(new VarCharType(8))) .build(); } + + @Override + protected String fileCompression() { + return "LZ4"; + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java index 11c60a381..2511d7ed7 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcWriterFactoryTest.java @@ -50,8 +50,8 @@ class OrcWriterFactoryTest { "struct<_col0:string,_col1:int>", new DataType[] {DataTypes.STRING(), DataTypes.INT()}), memoryManager); - factory.create(new LocalPositionOutputStream(tmpDir.resolve("file1").toFile())); - factory.create(new LocalPositionOutputStream(tmpDir.resolve("file2").toFile())); + factory.create(new LocalPositionOutputStream(tmpDir.resolve("file1").toFile()), "LZ4"); + factory.create(new LocalPositionOutputStream(tmpDir.resolve("file2").toFile()), "LZ4"); List<Path> addedWriterPath = memoryManager.getAddedWriterPath(); assertThat(addedWriterPath).hasSize(2); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java index c108fc9cf..34f2abbd2 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileStatsExtractorTest.java @@ -95,4 +95,9 @@ public class ParquetFileStatsExtractorTest extends FileStatsExtractorTestBase { } return stats; } + + @Override + protected String fileCompression() { + return "SNAPPY"; + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 245e13f7f..429614366 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -297,7 +297,8 @@ public class ParquetReadWriteTest { conf.setInteger("parquet.block.size", rowGroupSize); ParquetWriterFactory factory = new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, conf)); - FormatWriter writer = factory.create(new LocalFileIO().newOutputStream(path, false)); + FormatWriter writer = + factory.create(new LocalFileIO().newOutputStream(path, false), "SNAPPY"); for (InternalRow row : rows) { writer.addElement(row); }
