This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 755e9d86e7e6555575d635ea3f7177602f0dc485 Author: Jingsong <[email protected]> AuthorDate: Thu Jul 20 15:01:14 2023 +0800 [core] Introduce WriteFormatContext to improve format write --- .../paimon/io/KeyValueFileWriterFactory.java | 197 +++++++++------------ .../paimon/operation/KeyValueFileStoreWrite.java | 16 +- .../apache/paimon/format/FileFormatSuffixTest.java | 2 +- .../paimon/io/KeyValueFileReadWriteTest.java | 8 +- .../apache/paimon/mergetree/LookupLevelsTest.java | 8 +- .../apache/paimon/mergetree/MergeTreeTestBase.java | 32 +--- 6 files changed, 94 insertions(+), 169 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a19db4aa5..8c484ab53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.TableStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.StatsCollectorFactories; @@ -36,7 +37,8 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Optional; +import java.util.function.Function; /** A factory to create {@link FileWriter}s for writing {@link KeyValue} files. */ public class KeyValueFileWriterFactory { @@ -45,15 +47,8 @@ public class KeyValueFileWriterFactory { private final long schemaId; private final RowType keyType; private final RowType valueType; - private final RowType recordType; - private final Map<String, FormatWriterFactory> writerFactoryMap; - @Nullable private final TableStatsExtractor tableStatsExtractor; - private final Map<String, DataFilePathFactory> pathFactoryMap; + private final WriteFormatContext formatContext; private final long suggestedFileSize; - private final Map<Integer, String> levelCompressions; - private final String fileCompression; - private final Map<Integer, String> levelFormats; - private final FileFormat fileFormat; private final CoreOptions options; private KeyValueFileWriterFactory( @@ -61,29 +56,15 @@ public class KeyValueFileWriterFactory { long schemaId, RowType keyType, RowType valueType, - RowType recordType, - FileFormat fileFormat, - Map<String, FormatWriterFactory> writerFactoryMap, - @Nullable TableStatsExtractor tableStatsExtractor, - Map<String, DataFilePathFactory> pathFactoryMap, + WriteFormatContext formatContext, long suggestedFileSize, - Map<Integer, String> levelCompressions, - String fileCompression, - Map<Integer, String> levelFormats, CoreOptions options) { this.fileIO = fileIO; this.schemaId = schemaId; this.keyType = keyType; this.valueType = valueType; - this.recordType = recordType; - this.fileFormat = fileFormat; - this.writerFactoryMap = writerFactoryMap; - this.tableStatsExtractor = tableStatsExtractor; - this.pathFactoryMap = pathFactoryMap; + this.formatContext = formatContext; this.suggestedFileSize = suggestedFileSize; - this.levelCompressions = levelCompressions; - this.fileCompression = fileCompression; - this.levelFormats = levelFormats; this.options = options; } @@ -96,78 +77,42 @@ public class KeyValueFileWriterFactory { } @VisibleForTesting - public DataFilePathFactory pathFactory(String format) { - return pathFactoryMap.get(format); + public DataFilePathFactory pathFactory(int level) { + return formatContext.pathFactory(level); } public RollingFileWriter<KeyValue, DataFileMeta> createRollingMergeTreeFileWriter(int level) { - String fileFormat = getFileFormat(level); return new RollingFileWriter<>( - () -> - createDataFileWriter( - pathFactoryMap.get(fileFormat).newPath(), - level, - getCompression(level)), + () -> createDataFileWriter(formatContext.pathFactory(level).newPath(), level), suggestedFileSize); } - private String getCompression(int level) { - if (null == levelCompressions) { - return fileCompression; - } else { - return levelCompressions.getOrDefault(level, fileCompression); - } - } - - private String getFileFormat(int level) { - if (null == levelFormats) { - return fileFormat.getFormatIdentifier(); - } else { - return levelFormats.getOrDefault(level, fileFormat.getFormatIdentifier()); - } - } - public RollingFileWriter<KeyValue, DataFileMeta> createRollingChangelogFileWriter(int level) { - return new RollingFileWriter<>( () -> createDataFileWriter( - pathFactoryMap.get(getFileFormat(level)).newChangelogPath(), - level, - getCompression(level)), + formatContext.pathFactory(level).newChangelogPath(), level), suggestedFileSize); } - private KeyValueDataFileWriter createDataFileWriter(Path path, int level, String compression) { + private KeyValueDataFileWriter createDataFileWriter(Path path, int level) { KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); - String fileFormat = getFileFormat(level); return new KeyValueDataFileWriter( fileIO, - writerFactoryMap.get(fileFormat), + formatContext.writerFactory(level), path, kvSerializer::toRow, keyType, valueType, - getTableStatsExtractor(fileFormat), + formatContext.extractor(level), schemaId, level, - compression, + formatContext.compression(level), options); } - private TableStatsExtractor getTableStatsExtractor(String fileFormat) { - return null == fileFormat - ? tableStatsExtractor - : FileFormat.fromIdentifier(fileFormat, options.toConfiguration()) - .createStatsExtractor( - recordType, - StatsCollectorFactories.createStatsFactories( - options, recordType.getFieldNames())) - .orElse(null); - } - public void deleteFile(String filename, int level) { - fileIO.deleteQuietly(pathFactoryMap.get(getFileFormat(level)).toPath(filename)); + fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } public static Builder builder( @@ -217,56 +162,80 @@ public class KeyValueFileWriterFactory { } public KeyValueFileWriterFactory build( + BinaryRow partition, int bucket, CoreOptions options) { + RowType fileRowType = KeyValue.schema(keyType, valueType); + WriteFormatContext context = + new WriteFormatContext( + partition, + bucket, + fileRowType, + fileFormat, + format2PathFactory, + options); + return new KeyValueFileWriterFactory( + fileIO, schemaId, keyType, valueType, context, suggestedFileSize, options); + } + } + + private static class WriteFormatContext { + + private final Function<Integer, String> level2Format; + private final Function<Integer, String> level2Compress; + + private final Map<String, Optional<TableStatsExtractor>> format2Extractor; + private final Map<String, DataFilePathFactory> format2PathFactory; + private final Map<String, FormatWriterFactory> format2WriterFactory; + + private WriteFormatContext( BinaryRow partition, int bucket, - Map<Integer, String> levelCompressions, - String fileCompression, - Map<Integer, String> levelFormats, + RowType rowType, + FileFormat defaultFormat, + Map<String, FileStorePathFactory> parentFactories, CoreOptions options) { - RowType recordType = KeyValue.schema(keyType, valueType); - - Map<String, FormatWriterFactory> writerFactoryMap = new HashMap<>(); - writerFactoryMap.put( - fileFormat.getFormatIdentifier(), fileFormat.createWriterFactory(recordType)); - if (null != levelFormats) { - for (String fileFormat : levelFormats.values()) { - writerFactoryMap.putIfAbsent( - fileFormat, - FileFormat.fromIdentifier(fileFormat, options.toConfiguration()) - .createWriterFactory(recordType)); - } + Map<Integer, String> fileFormatPerLevel = options.fileFormatPerLevel(); + this.level2Format = + level -> + fileFormatPerLevel.getOrDefault( + level, defaultFormat.getFormatIdentifier()); + + String defaultCompress = options.fileCompression(); + Map<Integer, String> fileCompressionPerLevel = options.fileCompressionPerLevel(); + this.level2Compress = + level -> fileCompressionPerLevel.getOrDefault(level, defaultCompress); + + this.format2Extractor = new HashMap<>(); + this.format2PathFactory = new HashMap<>(); + this.format2WriterFactory = new HashMap<>(); + FieldStatsCollector.Factory[] statsCollectorFactories = + StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames()); + for (String format : parentFactories.keySet()) { + format2PathFactory.put( + format, + parentFactories.get(format).createDataFilePathFactory(partition, bucket)); + + FileFormat fileFormat = FileFormat.getFileFormat(options.toConfiguration(), format); + format2Extractor.put( + format, fileFormat.createStatsExtractor(rowType, statsCollectorFactories)); + format2WriterFactory.put(format, fileFormat.createWriterFactory(rowType)); } + } - Map<String, DataFilePathFactory> dataFilePathFactoryMap = - format2PathFactory.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getValue() - .createDataFilePathFactory( - partition, bucket))); + @Nullable + private TableStatsExtractor extractor(int level) { + return format2Extractor.get(level2Format.apply(level)).orElse(null); + } - return new KeyValueFileWriterFactory( - fileIO, - schemaId, - keyType, - valueType, - recordType, - fileFormat, - writerFactoryMap, - fileFormat - .createStatsExtractor( - recordType, - StatsCollectorFactories.createStatsFactories( - options, recordType.getFieldNames())) - .orElse(null), - dataFilePathFactoryMap, - suggestedFileSize, - levelCompressions, - fileCompression, - levelFormats, - options); + private DataFilePathFactory pathFactory(int level) { + return format2PathFactory.get(level2Format.apply(level)); + } + + private FormatWriterFactory writerFactory(int level) { + return format2WriterFactory.get(level2Format.apply(level)); + } + + private String compression(int level) { + return level2Compress.apply(level); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 06bb373cf..176914738 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -145,13 +145,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { } KeyValueFileWriterFactory writerFactory = - writerFactoryBuilder.build( - partition, - bucket, - options.fileCompressionPerLevel(), - options.fileCompression(), - options.fileFormatPerLevel(), - options); + writerFactoryBuilder.build(partition, bucket, options); Comparator<InternalRow> keyComparator = keyComparatorSupplier.get(); Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); UniversalCompaction universalCompaction = @@ -210,13 +204,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { BinaryRow partition, int bucket, Comparator<InternalRow> keyComparator, Levels levels) { KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build(partition, bucket); KeyValueFileWriterFactory writerFactory = - writerFactoryBuilder.build( - partition, - bucket, - options.fileCompressionPerLevel(), - options.fileCompression(), - options.fileFormatPerLevel(), - options); + writerFactoryBuilder.build(partition, bucket, options); MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, ioManager); switch (options.changelogProducer()) { case FULL_COMPACTION: diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 4b8e98ec2..80fa97b23 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -56,7 +56,7 @@ public class FileFormatSuffixTest extends KeyValueFileReadWriteTest { public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception { String format = "avro"; KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); - Path path = writerFactory.pathFactory(format).newPath(); + Path path = writerFactory.pathFactory(0).newPath(); Assertions.assertTrue(path.getPath().endsWith(format)); DataFilePathFactory dataFilePathFactory = diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 0e2e5a3b1..dbfcbcd0b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -268,13 +268,7 @@ public class KeyValueFileReadWriteTest { new FlushingFileFormat(format), pathFactoryMap, suggestedFileSize) - .build( - BinaryRow.EMPTY_ROW, - 0, - null, - null, - new HashMap<>(), - new CoreOptions(options)); + .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(options)); } private KeyValueFileReaderFactory createReaderFactory( diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 3c6c19182..079ebce31 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -239,13 +239,7 @@ public class LookupLevelsTest { new FlushingFileFormat(identifier), pathFactoryMap, TARGET_FILE_SIZE.defaultValue().getBytes()) - .build( - BinaryRow.EMPTY_ROW, - 0, - null, - null, - new HashMap<>(), - new CoreOptions(new Options())); + .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options())); } private KeyValueFileReaderFactory createReaderFactory() { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 2983028b7..9e47f7f85 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -59,7 +59,6 @@ import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExceptionUtils; import org.apache.paimon.utils.FileStorePathFactory; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -104,17 +103,14 @@ public abstract class MergeTreeTestBase { private KeyValueFileWriterFactory writerFactory; private KeyValueFileWriterFactory compactWriterFactory; private MergeTreeWriter writer; - private SortEngine sortEngine; - private String identifier = "avro"; @BeforeEach public void beforeEach() throws IOException { path = new Path(tempDir.toString()); pathFactory = new FileStorePathFactory(path); comparator = Comparator.comparingInt(o -> o.getInt(0)); - sortEngine = getSortEngine(); recreateMergeTree(1024 * 1024); - Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); LocalFileIO.create().mkdirs(bucketDir); } @@ -143,6 +139,7 @@ public abstract class MergeTreeTestBase { RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType()))); + String identifier = "avro"; FileFormat flushingAvro = new FlushingFileFormat(identifier); KeyValueFileReaderFactory.Builder readerFactoryBuilder = KeyValueFileReaderFactory.builder( @@ -186,22 +183,8 @@ public abstract class MergeTreeTestBase { flushingAvro, pathFactoryMap, options.targetFileSize()); - writerFactory = - writerFactoryBuilder.build( - BinaryRow.EMPTY_ROW, - 0, - options.fileCompressionPerLevel(), - options.fileCompression(), - options.fileFormatPerLevel(), - options); - compactWriterFactory = - writerFactoryBuilder.build( - BinaryRow.EMPTY_ROW, - 0, - options.fileCompressionPerLevel(), - options.fileCompression(), - options.fileFormatPerLevel(), - options); + writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); + compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); writer = createMergeTreeWriter(Collections.emptyList()); } @@ -267,7 +250,6 @@ public abstract class MergeTreeTestBase { Assertions.assertEquals(1, increment.compactIncrement().compactAfter().size()); } - @Nullable private List<DataFileMeta> generateDataFileToCommit() throws Exception { List<DataFileMeta> newFiles = new ArrayList<>(); @@ -412,7 +394,7 @@ public abstract class MergeTreeTestBase { writer.close(); - Path bucketDir = writerFactory.pathFactory(identifier).toPath("ignore").getParent(); + Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent(); Set<String> files = Arrays.stream(LocalFileIO.create().listStatus(bucketDir)) .map(FileStatus::getPath) @@ -527,9 +509,7 @@ public abstract class MergeTreeTestBase { private void assertRecords(List<TestRecord> expected) throws Exception { // compaction will drop delete List<DataFileMeta> files = - ((MergeTreeCompactManager) ((MergeTreeWriter) writer).compactManager()) - .levels() - .allFiles(); + ((MergeTreeCompactManager) writer.compactManager()).levels().allFiles(); assertRecords(expected, files, true); }
