This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 755e9d86e7e6555575d635ea3f7177602f0dc485
Author: Jingsong <[email protected]>
AuthorDate: Thu Jul 20 15:01:14 2023 +0800

    [core] Introduce WriteFormatContext to improve format write
---
 .../paimon/io/KeyValueFileWriterFactory.java       | 197 +++++++++------------
 .../paimon/operation/KeyValueFileStoreWrite.java   |  16 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   2 +-
 .../paimon/io/KeyValueFileReadWriteTest.java       |   8 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   8 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  32 +---
 6 files changed, 94 insertions(+), 169 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index a19db4aa5..8c484ab53 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.StatsCollectorFactories;
@@ -36,7 +37,8 @@ import javax.annotation.Nullable;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.Optional;
+import java.util.function.Function;
 
 /** A factory to create {@link FileWriter}s for writing {@link KeyValue} 
files. */
 public class KeyValueFileWriterFactory {
@@ -45,15 +47,8 @@ public class KeyValueFileWriterFactory {
     private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
-    private final RowType recordType;
-    private final Map<String, FormatWriterFactory> writerFactoryMap;
-    @Nullable private final TableStatsExtractor tableStatsExtractor;
-    private final Map<String, DataFilePathFactory> pathFactoryMap;
+    private final WriteFormatContext formatContext;
     private final long suggestedFileSize;
-    private final Map<Integer, String> levelCompressions;
-    private final String fileCompression;
-    private final Map<Integer, String> levelFormats;
-    private final FileFormat fileFormat;
     private final CoreOptions options;
 
     private KeyValueFileWriterFactory(
@@ -61,29 +56,15 @@ public class KeyValueFileWriterFactory {
             long schemaId,
             RowType keyType,
             RowType valueType,
-            RowType recordType,
-            FileFormat fileFormat,
-            Map<String, FormatWriterFactory> writerFactoryMap,
-            @Nullable TableStatsExtractor tableStatsExtractor,
-            Map<String, DataFilePathFactory> pathFactoryMap,
+            WriteFormatContext formatContext,
             long suggestedFileSize,
-            Map<Integer, String> levelCompressions,
-            String fileCompression,
-            Map<Integer, String> levelFormats,
             CoreOptions options) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.keyType = keyType;
         this.valueType = valueType;
-        this.recordType = recordType;
-        this.fileFormat = fileFormat;
-        this.writerFactoryMap = writerFactoryMap;
-        this.tableStatsExtractor = tableStatsExtractor;
-        this.pathFactoryMap = pathFactoryMap;
+        this.formatContext = formatContext;
         this.suggestedFileSize = suggestedFileSize;
-        this.levelCompressions = levelCompressions;
-        this.fileCompression = fileCompression;
-        this.levelFormats = levelFormats;
         this.options = options;
     }
 
@@ -96,78 +77,42 @@ public class KeyValueFileWriterFactory {
     }
 
     @VisibleForTesting
-    public DataFilePathFactory pathFactory(String format) {
-        return pathFactoryMap.get(format);
+    public DataFilePathFactory pathFactory(int level) {
+        return formatContext.pathFactory(level);
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingMergeTreeFileWriter(int level) {
-        String fileFormat = getFileFormat(level);
         return new RollingFileWriter<>(
-                () ->
-                        createDataFileWriter(
-                                pathFactoryMap.get(fileFormat).newPath(),
-                                level,
-                                getCompression(level)),
+                () -> 
createDataFileWriter(formatContext.pathFactory(level).newPath(), level),
                 suggestedFileSize);
     }
 
-    private String getCompression(int level) {
-        if (null == levelCompressions) {
-            return fileCompression;
-        } else {
-            return levelCompressions.getOrDefault(level, fileCompression);
-        }
-    }
-
-    private String getFileFormat(int level) {
-        if (null == levelFormats) {
-            return fileFormat.getFormatIdentifier();
-        } else {
-            return levelFormats.getOrDefault(level, 
fileFormat.getFormatIdentifier());
-        }
-    }
-
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingChangelogFileWriter(int level) {
-
         return new RollingFileWriter<>(
                 () ->
                         createDataFileWriter(
-                                
pathFactoryMap.get(getFileFormat(level)).newChangelogPath(),
-                                level,
-                                getCompression(level)),
+                                
formatContext.pathFactory(level).newChangelogPath(), level),
                 suggestedFileSize);
     }
 
-    private KeyValueDataFileWriter createDataFileWriter(Path path, int level, 
String compression) {
+    private KeyValueDataFileWriter createDataFileWriter(Path path, int level) {
         KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, 
valueType);
-        String fileFormat = getFileFormat(level);
         return new KeyValueDataFileWriter(
                 fileIO,
-                writerFactoryMap.get(fileFormat),
+                formatContext.writerFactory(level),
                 path,
                 kvSerializer::toRow,
                 keyType,
                 valueType,
-                getTableStatsExtractor(fileFormat),
+                formatContext.extractor(level),
                 schemaId,
                 level,
-                compression,
+                formatContext.compression(level),
                 options);
     }
 
-    private TableStatsExtractor getTableStatsExtractor(String fileFormat) {
-        return null == fileFormat
-                ? tableStatsExtractor
-                : FileFormat.fromIdentifier(fileFormat, 
options.toConfiguration())
-                        .createStatsExtractor(
-                                recordType,
-                                StatsCollectorFactories.createStatsFactories(
-                                        options, recordType.getFieldNames()))
-                        .orElse(null);
-    }
-
     public void deleteFile(String filename, int level) {
-        
fileIO.deleteQuietly(pathFactoryMap.get(getFileFormat(level)).toPath(filename));
+        
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
     }
 
     public static Builder builder(
@@ -217,56 +162,80 @@ public class KeyValueFileWriterFactory {
         }
 
         public KeyValueFileWriterFactory build(
+                BinaryRow partition, int bucket, CoreOptions options) {
+            RowType fileRowType = KeyValue.schema(keyType, valueType);
+            WriteFormatContext context =
+                    new WriteFormatContext(
+                            partition,
+                            bucket,
+                            fileRowType,
+                            fileFormat,
+                            format2PathFactory,
+                            options);
+            return new KeyValueFileWriterFactory(
+                    fileIO, schemaId, keyType, valueType, context, 
suggestedFileSize, options);
+        }
+    }
+
+    private static class WriteFormatContext {
+
+        private final Function<Integer, String> level2Format;
+        private final Function<Integer, String> level2Compress;
+
+        private final Map<String, Optional<TableStatsExtractor>> 
format2Extractor;
+        private final Map<String, DataFilePathFactory> format2PathFactory;
+        private final Map<String, FormatWriterFactory> format2WriterFactory;
+
+        private WriteFormatContext(
                 BinaryRow partition,
                 int bucket,
-                Map<Integer, String> levelCompressions,
-                String fileCompression,
-                Map<Integer, String> levelFormats,
+                RowType rowType,
+                FileFormat defaultFormat,
+                Map<String, FileStorePathFactory> parentFactories,
                 CoreOptions options) {
-            RowType recordType = KeyValue.schema(keyType, valueType);
-
-            Map<String, FormatWriterFactory> writerFactoryMap = new 
HashMap<>();
-            writerFactoryMap.put(
-                    fileFormat.getFormatIdentifier(), 
fileFormat.createWriterFactory(recordType));
-            if (null != levelFormats) {
-                for (String fileFormat : levelFormats.values()) {
-                    writerFactoryMap.putIfAbsent(
-                            fileFormat,
-                            FileFormat.fromIdentifier(fileFormat, 
options.toConfiguration())
-                                    .createWriterFactory(recordType));
-                }
+            Map<Integer, String> fileFormatPerLevel = 
options.fileFormatPerLevel();
+            this.level2Format =
+                    level ->
+                            fileFormatPerLevel.getOrDefault(
+                                    level, 
defaultFormat.getFormatIdentifier());
+
+            String defaultCompress = options.fileCompression();
+            Map<Integer, String> fileCompressionPerLevel = 
options.fileCompressionPerLevel();
+            this.level2Compress =
+                    level -> fileCompressionPerLevel.getOrDefault(level, 
defaultCompress);
+
+            this.format2Extractor = new HashMap<>();
+            this.format2PathFactory = new HashMap<>();
+            this.format2WriterFactory = new HashMap<>();
+            FieldStatsCollector.Factory[] statsCollectorFactories =
+                    StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
+            for (String format : parentFactories.keySet()) {
+                format2PathFactory.put(
+                        format,
+                        
parentFactories.get(format).createDataFilePathFactory(partition, bucket));
+
+                FileFormat fileFormat = 
FileFormat.getFileFormat(options.toConfiguration(), format);
+                format2Extractor.put(
+                        format, fileFormat.createStatsExtractor(rowType, 
statsCollectorFactories));
+                format2WriterFactory.put(format, 
fileFormat.createWriterFactory(rowType));
             }
+        }
 
-            Map<String, DataFilePathFactory> dataFilePathFactoryMap =
-                    format2PathFactory.entrySet().stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            Map.Entry::getKey,
-                                            e ->
-                                                    e.getValue()
-                                                            
.createDataFilePathFactory(
-                                                                    partition, 
bucket)));
+        @Nullable
+        private TableStatsExtractor extractor(int level) {
+            return 
format2Extractor.get(level2Format.apply(level)).orElse(null);
+        }
 
-            return new KeyValueFileWriterFactory(
-                    fileIO,
-                    schemaId,
-                    keyType,
-                    valueType,
-                    recordType,
-                    fileFormat,
-                    writerFactoryMap,
-                    fileFormat
-                            .createStatsExtractor(
-                                    recordType,
-                                    
StatsCollectorFactories.createStatsFactories(
-                                            options, 
recordType.getFieldNames()))
-                            .orElse(null),
-                    dataFilePathFactoryMap,
-                    suggestedFileSize,
-                    levelCompressions,
-                    fileCompression,
-                    levelFormats,
-                    options);
+        private DataFilePathFactory pathFactory(int level) {
+            return format2PathFactory.get(level2Format.apply(level));
+        }
+
+        private FormatWriterFactory writerFactory(int level) {
+            return format2WriterFactory.get(level2Format.apply(level));
+        }
+
+        private String compression(int level) {
+            return level2Compress.apply(level);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 06bb373cf..176914738 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -145,13 +145,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         }
 
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(
-                        partition,
-                        bucket,
-                        options.fileCompressionPerLevel(),
-                        options.fileCompression(),
-                        options.fileFormatPerLevel(),
-                        options);
+                writerFactoryBuilder.build(partition, bucket, options);
         Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
         Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         UniversalCompaction universalCompaction =
@@ -210,13 +204,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             BinaryRow partition, int bucket, Comparator<InternalRow> 
keyComparator, Levels levels) {
         KeyValueFileReaderFactory readerFactory = 
readerFactoryBuilder.build(partition, bucket);
         KeyValueFileWriterFactory writerFactory =
-                writerFactoryBuilder.build(
-                        partition,
-                        bucket,
-                        options.fileCompressionPerLevel(),
-                        options.fileCompression(),
-                        options.fileFormatPerLevel(),
-                        options);
+                writerFactoryBuilder.build(partition, bucket, options);
         MergeSorter mergeSorter = new MergeSorter(options, keyType, valueType, 
ioManager);
         switch (options.changelogProducer()) {
             case FULL_COMPACTION:
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 4b8e98ec2..80fa97b23 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -56,7 +56,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
     public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws 
Exception {
         String format = "avro";
         KeyValueFileWriterFactory writerFactory = 
createWriterFactory(tempDir.toString(), format);
-        Path path = writerFactory.pathFactory(format).newPath();
+        Path path = writerFactory.pathFactory(0).newPath();
         Assertions.assertTrue(path.getPath().endsWith(format));
 
         DataFilePathFactory dataFilePathFactory =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 0e2e5a3b1..dbfcbcd0b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -268,13 +268,7 @@ public class KeyValueFileReadWriteTest {
                         new FlushingFileFormat(format),
                         pathFactoryMap,
                         suggestedFileSize)
-                .build(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        null,
-                        null,
-                        new HashMap<>(),
-                        new CoreOptions(options));
+                .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(options));
     }
 
     private KeyValueFileReaderFactory createReaderFactory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 3c6c19182..079ebce31 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -239,13 +239,7 @@ public class LookupLevelsTest {
                         new FlushingFileFormat(identifier),
                         pathFactoryMap,
                         TARGET_FILE_SIZE.defaultValue().getBytes())
-                .build(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        null,
-                        null,
-                        new HashMap<>(),
-                        new CoreOptions(new Options()));
+                .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
     }
 
     private KeyValueFileReaderFactory createReaderFactory() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 2983028b7..9e47f7f85 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -59,7 +59,6 @@ import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExceptionUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
 
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -104,17 +103,14 @@ public abstract class MergeTreeTestBase {
     private KeyValueFileWriterFactory writerFactory;
     private KeyValueFileWriterFactory compactWriterFactory;
     private MergeTreeWriter writer;
-    private SortEngine sortEngine;
-    private String identifier = "avro";
 
     @BeforeEach
     public void beforeEach() throws IOException {
         path = new Path(tempDir.toString());
         pathFactory = new FileStorePathFactory(path);
         comparator = Comparator.comparingInt(o -> o.getInt(0));
-        sortEngine = getSortEngine();
         recreateMergeTree(1024 * 1024);
-        Path bucketDir = 
writerFactory.pathFactory(identifier).toPath("ignore").getParent();
+        Path bucketDir = 
writerFactory.pathFactory(0).toPath("ignore").getParent();
         LocalFileIO.create().mkdirs(bucketDir);
     }
 
@@ -143,6 +139,7 @@ public abstract class MergeTreeTestBase {
         RowType keyType = new RowType(singletonList(new DataField(0, "k", new 
IntType())));
         RowType valueType = new RowType(singletonList(new DataField(0, "v", 
new IntType())));
 
+        String identifier = "avro";
         FileFormat flushingAvro = new FlushingFileFormat(identifier);
         KeyValueFileReaderFactory.Builder readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
@@ -186,22 +183,8 @@ public abstract class MergeTreeTestBase {
                         flushingAvro,
                         pathFactoryMap,
                         options.targetFileSize());
-        writerFactory =
-                writerFactoryBuilder.build(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        options.fileCompressionPerLevel(),
-                        options.fileCompression(),
-                        options.fileFormatPerLevel(),
-                        options);
-        compactWriterFactory =
-                writerFactoryBuilder.build(
-                        BinaryRow.EMPTY_ROW,
-                        0,
-                        options.fileCompressionPerLevel(),
-                        options.fileCompression(),
-                        options.fileFormatPerLevel(),
-                        options);
+        writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, 
options);
+        compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 
0, options);
         writer = createMergeTreeWriter(Collections.emptyList());
     }
 
@@ -267,7 +250,6 @@ public abstract class MergeTreeTestBase {
         Assertions.assertEquals(1, 
increment.compactIncrement().compactAfter().size());
     }
 
-    @Nullable
     private List<DataFileMeta> generateDataFileToCommit() throws Exception {
         List<DataFileMeta> newFiles = new ArrayList<>();
 
@@ -412,7 +394,7 @@ public abstract class MergeTreeTestBase {
 
         writer.close();
 
-        Path bucketDir = 
writerFactory.pathFactory(identifier).toPath("ignore").getParent();
+        Path bucketDir = 
writerFactory.pathFactory(0).toPath("ignore").getParent();
         Set<String> files =
                 Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
                         .map(FileStatus::getPath)
@@ -527,9 +509,7 @@ public abstract class MergeTreeTestBase {
     private void assertRecords(List<TestRecord> expected) throws Exception {
         // compaction will drop delete
         List<DataFileMeta> files =
-                ((MergeTreeCompactManager) ((MergeTreeWriter) 
writer).compactManager())
-                        .levels()
-                        .allFiles();
+                ((MergeTreeCompactManager) 
writer.compactManager()).levels().allFiles();
         assertRecords(expected, files, true);
     }
 

Reply via email to