This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 78f78053075e0e88925d2c98bcfeaf1462201ca1 Author: zhangjun <[email protected]> AuthorDate: Tue Jul 4 22:56:27 2023 +0800 [core] add file-format-per-level for paimon This closes #1500 --- .../shortcodes/generated/core_configuration.html | 6 ++ .../main/java/org/apache/paimon/CoreOptions.java | 17 +++ .../java/org/apache/paimon/KeyValueFileStore.java | 22 ++++ .../paimon/io/KeyValueFileWriterFactory.java | 114 +++++++++++++++++---- .../apache/paimon/mergetree/MergeTreeWriter.java | 8 +- .../paimon/operation/KeyValueFileStoreWrite.java | 6 +- .../apache/paimon/format/FileFormatSuffixTest.java | 2 +- .../paimon/io/KeyValueFileReadWriteTest.java | 23 ++++- .../apache/paimon/mergetree/LookupLevelsTest.java | 15 ++- .../apache/paimon/mergetree/MergeTreeTestBase.java | 16 ++- .../apache/paimon/flink/CatalogTableITCase.java | 18 ++++ .../flink/source/TestChangelogDataReadWrite.java | 6 ++ 12 files changed, 216 insertions(+), 37 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 36dba2b52..d5a09f48a 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -152,6 +152,12 @@ under the License. <td><p>Enum</p></td> <td>Specify the message format of data files, currently orc, parquet and avro are supported.<br /><br />Possible values:<ul><li>"orc": ORC file format.</li><li>"parquet": Parquet file format.</li><li>"avro": Avro file format.</li></ul></td> </tr> + <tr> + <td><h5>file.format.per.level</h5></td> + <td style="word-wrap: break-word;"></td> + <td>Map</td> + <td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td> + </tr> <tr> <td><h5>full-compaction.delta-commits</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 3552bc21d..084cf45df 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -117,6 +117,17 @@ public class CoreOptions implements Serializable { + "could be NONE, ZLIB, SNAPPY, LZO, LZ4, for parquet file format, the compression value could be " + "UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD."); + public static final ConfigOption<Map<String, String>> FILE_FORMAT_PER_LEVEL = + key("file.format.per.level") + .mapType() + .defaultValue(new HashMap<>()) + .withDescription( + "Define different file format for different level, you can add the conf like this:" + + " 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, " + + "the default format which set by `" + + FILE_FORMAT.key() + + "` will be used."); + public static final ConfigOption<String> FILE_COMPRESSION = key("file.compression") .stringType() @@ -885,6 +896,12 @@ public class CoreOptions implements Serializable { .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue)); } + public Map<Integer, String> fileFormatPerLevel() { + Map<String, String> levelFormats = options.get(FILE_FORMAT_PER_LEVEL); + return levelFormats.entrySet().stream() + .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue)); + } + public String fileCompression() { return options.get(FILE_COMPRESSION); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index ad0ad67dd..ba1ef81db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -35,11 +35,16 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.ValueEqualiserSupplier; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import static org.apache.paimon.predicate.PredicateBuilder.and; @@ -127,6 +132,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { valueEqualiserSupplier, mfFactory, pathFactory(), + format2PathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), indexFactory, @@ -134,6 +140,22 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { keyValueFieldsExtractor); } + private Map<String, FileStorePathFactory> format2PathFactory() { + Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>(); + Set<String> formats = new HashSet<>(options.fileFormatPerLevel().values()); + formats.add(options.fileFormat().getFormatIdentifier()); + formats.forEach( + format -> + pathFactoryMap.put( + format, + new FileStorePathFactory( + options.path(), + partitionType, + options.partitionDefaultName(), + format))); + return pathFactoryMap; + } + private KeyValueFileStoreScan newScan(boolean forWrite) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index f3912daf5..a19db4aa5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -34,7 +34,9 @@ import org.apache.paimon.utils.StatsCollectorFactories; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; /** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */ public class KeyValueFileWriterFactory { @@ -43,12 +45,15 @@ public class KeyValueFileWriterFactory { private final long schemaId; private final RowType keyType; private final RowType valueType; - private final FormatWriterFactory writerFactory; + private final RowType recordType; + private final Map<String, FormatWriterFactory> writerFactoryMap; @Nullable private final TableStatsExtractor tableStatsExtractor; - private final DataFilePathFactory pathFactory; + private final Map<String, DataFilePathFactory> pathFactoryMap; private final long suggestedFileSize; private final Map<Integer, String> levelCompressions; private final String fileCompression; + private final Map<Integer, String> levelFormats; + private final FileFormat fileFormat; private final CoreOptions options; private KeyValueFileWriterFactory( @@ -56,23 +61,29 @@ public class KeyValueFileWriterFactory { long schemaId, RowType keyType, RowType valueType, - FormatWriterFactory writerFactory, + RowType recordType, + FileFormat fileFormat, + Map<String, FormatWriterFactory> writerFactoryMap, @Nullable TableStatsExtractor tableStatsExtractor, - DataFilePathFactory pathFactory, + Map<String, DataFilePathFactory> pathFactoryMap, long suggestedFileSize, Map<Integer, String> levelCompressions, String fileCompression, + Map<Integer, String> levelFormats, CoreOptions options) { this.fileIO = fileIO; this.schemaId = schemaId; this.keyType = keyType; this.valueType = valueType; - this.writerFactory = writerFactory; + this.recordType = recordType; + this.fileFormat = fileFormat; + this.writerFactoryMap = writerFactoryMap; this.tableStatsExtractor = tableStatsExtractor; - this.pathFactory = pathFactory; + this.pathFactoryMap = pathFactoryMap; this.suggestedFileSize = suggestedFileSize; this.levelCompressions = levelCompressions; this.fileCompression = fileCompression; + this.levelFormats = levelFormats; this.options = options; } @@ -85,13 +96,18 @@ public class KeyValueFileWriterFactory { } @VisibleForTesting - public DataFilePathFactory pathFactory() { - return pathFactory; + public DataFilePathFactory pathFactory(String format) { + return pathFactoryMap.get(format); } public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(int level) { + String fileFormat = getFileFormat(level); return new RollingFileWriter<>( - () -> createDataFileWriter(pathFactory.newPath(), level, getCompression(level)), + () -> + createDataFileWriter( + pathFactoryMap.get(fileFormat).newPath(), + level, + getCompression(level)), suggestedFileSize); } @@ -103,32 +119,55 @@ public class KeyValueFileWriterFactory { } } + private String getFileFormat(int level) { + if (null == levelFormats) { + return fileFormat.getFormatIdentifier(); + } else { + return levelFormats.getOrDefault(level, fileFormat.getFormatIdentifier()); + } + } + public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) { + return new RollingFileWriter<>( () -> createDataFileWriter( - pathFactory.newChangelogPath(), level, getCompression(level)), + pathFactoryMap.get(getFileFormat(level)).newChangelogPath(), + level, + getCompression(level)), suggestedFileSize); } private KeyValueDataFileWriter createDataFileWriter(Path path, int level, String compression) { KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); + String fileFormat = getFileFormat(level); return new KeyValueDataFileWriter( fileIO, - writerFactory, + writerFactoryMap.get(fileFormat), path, kvSerializer::toRow, keyType, valueType, - tableStatsExtractor, + getTableStatsExtractor(fileFormat), schemaId, level, compression, options); } - public void deleteFile(String filename) { - fileIO.deleteQuietly(pathFactory.toPath(filename)); + private TableStatsExtractor getTableStatsExtractor(String fileFormat) { + return null == fileFormat + ? tableStatsExtractor + : FileFormat.fromIdentifier(fileFormat, options.toConfiguration()) + .createStatsExtractor( + recordType, + StatsCollectorFactories.createStatsFactories( + options, recordType.getFieldNames())) + .orElse(null); + } + + public void deleteFile(String filename, int level) { + fileIO.deleteQuietly(pathFactoryMap.get(getFileFormat(level)).toPath(filename)); } public static Builder builder( @@ -137,10 +176,16 @@ public class KeyValueFileWriterFactory { RowType keyType, RowType valueType, FileFormat fileFormat, - FileStorePathFactory pathFactory, + Map<String, FileStorePathFactory> format2PathFactory, long suggestedFileSize) { return new Builder( - fileIO, schemaId, keyType, valueType, fileFormat, pathFactory, suggestedFileSize); + fileIO, + schemaId, + keyType, + valueType, + fileFormat, + format2PathFactory, + suggestedFileSize); } /** Builder of {@link KeyValueFileWriterFactory}. */ @@ -151,7 +196,7 @@ public class KeyValueFileWriterFactory { private final RowType keyType; private final RowType valueType; private final FileFormat fileFormat; - private final FileStorePathFactory pathFactory; + private final Map<String, FileStorePathFactory> format2PathFactory; private final long suggestedFileSize; private Builder( @@ -160,14 +205,14 @@ public class KeyValueFileWriterFactory { RowType keyType, RowType valueType, FileFormat fileFormat, - FileStorePathFactory pathFactory, + Map<String, FileStorePathFactory> format2PathFactory, long suggestedFileSize) { this.fileIO = fileIO; this.schemaId = schemaId; this.keyType = keyType; this.valueType = valueType; this.fileFormat = fileFormat; - this.pathFactory = pathFactory; + this.format2PathFactory = format2PathFactory; this.suggestedFileSize = suggestedFileSize; } @@ -176,24 +221,51 @@ public class KeyValueFileWriterFactory { int bucket, Map<Integer, String> levelCompressions, String fileCompression, + Map<Integer, String> levelFormats, CoreOptions options) { RowType recordType = KeyValue.schema(keyType, valueType); + + Map<String, FormatWriterFactory> writerFactoryMap = new HashMap<>(); + writerFactoryMap.put( + fileFormat.getFormatIdentifier(), fileFormat.createWriterFactory(recordType)); + if (null != levelFormats) { + for (String fileFormat : levelFormats.values()) { + writerFactoryMap.putIfAbsent( + fileFormat, + FileFormat.fromIdentifier(fileFormat, options.toConfiguration()) + .createWriterFactory(recordType)); + } + } + + Map<String, DataFilePathFactory> dataFilePathFactoryMap = + format2PathFactory.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getValue() + .createDataFilePathFactory( + partition, bucket))); + return new KeyValueFileWriterFactory( fileIO, schemaId, keyType, valueType, - fileFormat.createWriterFactory(recordType), + recordType, + fileFormat, + writerFactoryMap, fileFormat .createStatsExtractor( recordType, StatsCollectorFactories.createStatsFactories( options, recordType.getFieldNames())) .orElse(null), - pathFactory.createDataFilePathFactory(partition, bucket), + dataFilePathFactoryMap, suggestedFileSize, levelCompressions, fileCompression, + levelFormats, options); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index dfe22f2fe..c43f97604 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -270,7 +270,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file.fileName()); + writerFactory.deleteFile(file.fileName(), file.level()); } } else { compactBefore.put(file.fileName(), file); @@ -297,7 +297,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { newFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file.fileName()); + writerFactory.deleteFile(file.fileName(), file.level()); } newFilesChangelog.clear(); @@ -312,12 +312,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file.fileName()); + writerFactory.deleteFile(file.fileName(), file.level()); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file.fileName()); + writerFactory.deleteFile(file.fileName(), file.level()); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 4c5d6926c..06bb373cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -62,6 +62,7 @@ import javax.annotation.Nullable; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -93,6 +94,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { Supplier<RecordEqualiser> valueEqualiserSupplier, MergeFunctionFactory<KeyValue> mfFactory, FileStorePathFactory pathFactory, + Map<String, FileStorePathFactory> format2PathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @Nullable IndexMaintainer.Factory<KeyValue> indexFactory, @@ -119,7 +121,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { keyType, valueType, options.fileFormat(), - pathFactory, + format2PathFactory, options.targetFileSize()); this.keyComparatorSupplier = keyComparatorSupplier; this.valueEqualiserSupplier = valueEqualiserSupplier; @@ -148,6 +150,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { bucket, options.fileCompressionPerLevel(), options.fileCompression(), + options.fileFormatPerLevel(), options); Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); @@ -212,6 +215,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { bucket, options.fileCompressionPerLevel(), options.fileCompression(), + options.fileFormatPerLevel(), options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); switch (options.changelogProducer()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 09b36d2e6..4b8e98ec2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -56,7 +56,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest { public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception { String format = "avro"; KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); - Path path = writerFactory.pathFactory().newPath(); + Path path = writerFactory.pathFactory(format).newPath(); Assertions.assertTrue(path.getPath().endsWith(format)); DataFilePathFactory dataFilePathFactory = diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index d612a1d1b..0e2e5a3b1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -49,8 +49,10 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; @@ -244,6 +246,17 @@ public class KeyValueFileReadWriteTest { FileIO fileIO = FileIOFinder.find(path); Options options = new Options(); options.set(CoreOptions.METADATA_STATS_MODE, "FULL"); + + Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>(); + pathFactoryMap.put(format, pathFactory); + pathFactoryMap.put( + CoreOptions.FILE_FORMAT.defaultValue().toString(), + new FileStorePathFactory( + path, + RowType.of(), + CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(), + CoreOptions.FILE_FORMAT.defaultValue().toString())); + return KeyValueFileWriterFactory.builder( fileIO, 0, @@ -253,9 +266,15 @@ public class KeyValueFileReadWriteTest { // if the written file size is really larger than suggested, so we use a // special format which flushes for every added element new FlushingFileFormat(format), - pathFactory, + pathFactoryMap, suggestedFileSize) - .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(options)); + .build( + BinaryRow.EMPTY_ROW, + 0, + null, + null, + new HashMap<>(), + new CoreOptions(options)); } private KeyValueFileReaderFactory createReaderFactory( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 114ff92a3..3c6c19182 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -228,15 +228,24 @@ public class LookupLevelsTest { private KeyValueFileWriterFactory createWriterFactory() { Path path = new Path(tempDir.toUri().toString()); + String identifier = "avro"; + Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>(); + pathFactoryMap.put(identifier, new FileStorePathFactory(path)); return KeyValueFileWriterFactory.builder( FileIOFinder.find(path), 0, keyType, rowType, - new FlushingFileFormat("avro"), - new FileStorePathFactory(path), + new FlushingFileFormat(identifier), + pathFactoryMap, TARGET_FILE_SIZE.defaultValue().getBytes()) - .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new Options())); + .build( + BinaryRow.EMPTY_ROW, + 0, + null, + null, + new HashMap<>(), + new CoreOptions(new Options())); } private KeyValueFileReaderFactory createReaderFactory() { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index fd36afa8a..2983028b7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -105,6 +105,7 @@ public abstract class MergeTreeTestBase { private KeyValueFileWriterFactory compactWriterFactory; private MergeTreeWriter writer; private SortEngine sortEngine; + private String identifier = "avro"; @BeforeEach public void beforeEach() throws IOException { @@ -113,7 +114,7 @@ public abstract class MergeTreeTestBase { comparator = Comparator.comparingInt(o -> o.getInt(0)); sortEngine = getSortEngine(); recreateMergeTree(1024 * 1024); - Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent(); LocalFileIO.create().mkdirs(bucketDir); } @@ -142,7 +143,7 @@ public abstract class MergeTreeTestBase { RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType()))); - FileFormat flushingAvro = new FlushingFileFormat("avro"); + FileFormat flushingAvro = new FlushingFileFormat(identifier); KeyValueFileReaderFactory.Builder readerFactoryBuilder = KeyValueFileReaderFactory.builder( LocalFileIO.create(), @@ -173,6 +174,9 @@ public abstract class MergeTreeTestBase { }); readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0); + + Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>(); + pathFactoryMap.put(identifier, pathFactory); KeyValueFileWriterFactory.Builder writerFactoryBuilder = KeyValueFileWriterFactory.builder( LocalFileIO.create(), @@ -180,7 +184,7 @@ public abstract class MergeTreeTestBase { keyType, valueType, flushingAvro, - pathFactory, + pathFactoryMap, options.targetFileSize()); writerFactory = writerFactoryBuilder.build( @@ -188,6 +192,7 @@ public abstract class MergeTreeTestBase { 0, options.fileCompressionPerLevel(), options.fileCompression(), + options.fileFormatPerLevel(), options); compactWriterFactory = writerFactoryBuilder.build( @@ -195,6 +200,7 @@ public abstract class MergeTreeTestBase { 0, options.fileCompressionPerLevel(), options.fileCompression(), + options.fileFormatPerLevel(), options); writer = createMergeTreeWriter(Collections.emptyList()); } @@ -406,7 +412,7 @@ public abstract class MergeTreeTestBase { writer.close(); - Path bucketDir = writerFactory.pathFactory().toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent(); Set<String> files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -502,7 +508,7 @@ public abstract class MergeTreeTestBase { assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file.fileName()); + compactWriterFactory.deleteFile(file.fileName(), file.level()); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 746d097ab..e38bea757 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -343,6 +343,24 @@ public class CatalogTableITCase extends CatalogITCaseBase { "Can not set the write-mode to append-only and changelog-producer at the same time."); } + @Test + public void testFileFormatPerLevel() { + sql( + "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING) " + + "WITH ('num-sorted-run.compaction-trigger'='2'," + + "'file.format.per.level' = '0:avro,3:parquet'," + + " 'num-levels' = '4')"); + sql("INSERT INTO T1 SELECT 1,'AAA'"); + sql("INSERT INTO T1 SELECT 2,'BBB'"); + sql("INSERT INTO T1 SELECT 3,'CCC'"); + List<Row> rows = sql("SELECT * FROM T1"); + assertThat(rows) + .containsExactlyInAnyOrder(Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC")); + + rows = sql("SELECT level,file_format FROM T1$files"); + assertThat(rows).containsExactlyInAnyOrder(Row.of(3, "parquet"), Row.of(0, "avro")); + } + @Test public void testFilesTable() throws Exception { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 3106ed679..a8741b450 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -57,7 +57,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.function.Function; @@ -171,6 +173,9 @@ public class TestChangelogDataReadWrite { public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRow partition, int bucket) { CoreOptions options = new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro")); + + Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>(); + pathFactoryMap.put("avro", pathFactory); RecordWriter<KeyValue> writer = new KeyValueFileStoreWrite( LocalFileIO.create(), @@ -183,6 +188,7 @@ public class TestChangelogDataReadWrite { () -> EQUALISER, DeduplicateMergeFunction.factory(), pathFactory, + pathFactoryMap, snapshotManager, null, // not used, we only create an empty writer null,
