This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d28a03022 [core] Introduce specific options for changelog files 
(#5451)
8d28a03022 is described below

commit 8d28a0302234cc1bf7a15c882b924f9d58b2eeea
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 18:47:25 2025 +0800

    [core] Introduce specific options for changelog files (#5451)
---
 .../shortcodes/generated/core_configuration.html   |  18 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  35 ++++
 .../org/apache/paimon/io/FileWriterContext.java    |  48 +++++
 .../apache/paimon/io/KeyValueDataFileWriter.java   |  15 +-
 .../paimon/io/KeyValueDataFileWriterImpl.java      |   9 +-
 .../paimon/io/KeyValueFileWriterFactory.java       | 210 +++++++++++++--------
 .../paimon/io/KeyValueThinDataFileWriterImpl.java  |  11 +-
 .../org/apache/paimon/io/RowDataFileWriter.java    |  15 +-
 .../apache/paimon/io/RowDataRollingFileWriter.java |  18 +-
 .../paimon/io/StatsCollectingSingleFileWriter.java |   9 +-
 .../apache/paimon/utils/FileStorePathFactory.java  |  15 +-
 .../paimon/io/KeyValueFileReadWriteTest.java       |  35 ++--
 .../apache/paimon/io/RollingFileWriterTest.java    |  17 +-
 .../mergetree/ChangelogMergeTreeRewriterTest.java  |   2 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |   4 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   4 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   4 +-
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  47 ++++-
 18 files changed, 337 insertions(+), 179 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 92d5894af3..05b6c60bc1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,12 +68,30 @@ under the License.
             <td>MemorySize</td>
             <td>Memory page size for caching.</td>
         </tr>
+        <tr>
+            <td><h5>changelog-file.compression</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Changelog file compression.</td>
+        </tr>
+        <tr>
+            <td><h5>changelog-file.format</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specify the message format of changelog files, currently 
parquet, avro and orc are supported.</td>
+        </tr>
         <tr>
             <td><h5>changelog-file.prefix</h5></td>
             <td style="word-wrap: break-word;">"changelog-"</td>
             <td>String</td>
             <td>Specify the file name prefix of changelog files.</td>
         </tr>
+        <tr>
+            <td><h5>changelog-file.stats-mode</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Changelog file metadata stats collection. none, counts, 
truncate(16), full is available.</td>
+        </tr>
         <tr>
             <td><h5>changelog-producer</h5></td>
             <td style="word-wrap: break-word;">none</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e59b30a560..7bc6148f9f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -226,6 +226,26 @@ public class CoreOptions implements Serializable {
                     .defaultValue("changelog-")
                     .withDescription("Specify the file name prefix of 
changelog files.");
 
+    public static final ConfigOption<String> CHANGELOG_FILE_FORMAT =
+            key("changelog-file.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specify the message format of changelog files, 
currently parquet, avro and orc are supported.");
+
+    public static final ConfigOption<String> CHANGELOG_FILE_COMPRESSION =
+            key("changelog-file.compression")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Changelog file compression.");
+
+    public static final ConfigOption<String> CHANGELOG_FILE_STATS_MODE =
+            key("changelog-file.stats-mode")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Changelog file metadata stats collection. none, 
counts, truncate(16), full is available.");
+
     public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
             key("file.suffix.include.compression")
                     .booleanType()
@@ -1846,6 +1866,21 @@ public class CoreOptions implements Serializable {
         return options.get(CHANGELOG_FILE_PREFIX);
     }
 
+    @Nullable
+    public String changelogFileFormat() {
+        return options.get(CHANGELOG_FILE_FORMAT);
+    }
+
+    @Nullable
+    public String changelogFileCompression() {
+        return options.get(CHANGELOG_FILE_COMPRESSION);
+    }
+
+    @Nullable
+    public String changelogFileStatsMode() {
+        return options.get(CHANGELOG_FILE_STATS_MODE);
+    }
+
     public boolean fileSuffixIncludeCompression() {
         return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java
new file mode 100644
index 0000000000..58d7758b17
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.format.FormatWriterFactory;
+
+/** Context of file writer. */
+public class FileWriterContext {
+
+    private final FormatWriterFactory factory;
+    private final SimpleStatsProducer statsProducer;
+    private final String compression;
+
+    public FileWriterContext(
+            FormatWriterFactory factory, SimpleStatsProducer statsProducer, 
String compression) {
+        this.factory = factory;
+        this.statsProducer = statsProducer;
+        this.compression = compression;
+    }
+
+    public FormatWriterFactory factory() {
+        return factory;
+    }
+
+    public SimpleStatsProducer statsProducer() {
+        return statsProducer;
+    }
+
+    public String compression() {
+        return compression;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 0f5b4e9985..a8ea12dabd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -78,29 +77,19 @@ public abstract class KeyValueDataFileWriter
 
     public KeyValueDataFileWriter(
             FileIO fileIO,
-            FormatWriterFactory factory,
+            FileWriterContext context,
             Path path,
             Function<KeyValue, InternalRow> converter,
             RowType keyType,
             RowType valueType,
             RowType writeRowType,
-            SimpleStatsProducer statsProducer,
             long schemaId,
             int level,
-            String compression,
             CoreOptions options,
             FileSource fileSource,
             FileIndexOptions fileIndexOptions,
             boolean isExternalPath) {
-        super(
-                fileIO,
-                factory,
-                path,
-                converter,
-                writeRowType,
-                statsProducer,
-                compression,
-                options.asyncFileWrite());
+        super(fileIO, context, path, converter, writeRowType, 
options.asyncFileWrite());
 
         this.keyType = keyType;
         this.valueType = valueType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
index 053731c9cf..798c70ed23 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -38,31 +37,27 @@ public class KeyValueDataFileWriterImpl extends 
KeyValueDataFileWriter {
 
     public KeyValueDataFileWriterImpl(
             FileIO fileIO,
-            FormatWriterFactory factory,
+            FileWriterContext context,
             Path path,
             Function<KeyValue, InternalRow> converter,
             RowType keyType,
             RowType valueType,
-            SimpleStatsProducer statsProducer,
             long schemaId,
             int level,
-            String compression,
             CoreOptions options,
             FileSource fileSource,
             FileIndexOptions fileIndexOptions,
             boolean isExternalPath) {
         super(
                 fileIO,
-                factory,
+                context,
                 path,
                 converter,
                 keyType,
                 valueType,
                 KeyValue.schema(keyType, valueType),
-                statsProducer,
                 schemaId,
                 level,
-                compression,
                 options,
                 fileSource,
                 fileIndexOptions,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 20cb8874d9..014df2e4c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -41,14 +41,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StatsCollectorFactories;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.IntFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** A factory to create {@link FileWriter}s for writing {@link KeyValue} 
files. */
@@ -58,7 +61,7 @@ public class KeyValueFileWriterFactory {
     private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
-    private final WriteFormatContext formatContext;
+    private final FileWriterContextFactory formatContext;
     private final long suggestedFileSize;
     private final CoreOptions options;
     private final FileIndexOptions fileIndexOptions;
@@ -66,7 +69,7 @@ public class KeyValueFileWriterFactory {
     private KeyValueFileWriterFactory(
             FileIO fileIO,
             long schemaId,
-            WriteFormatContext formatContext,
+            FileWriterContextFactory formatContext,
             long suggestedFileSize,
             CoreOptions options) {
         this.fileIO = fileIO;
@@ -89,27 +92,29 @@ public class KeyValueFileWriterFactory {
 
     @VisibleForTesting
     public DataFilePathFactory pathFactory(int level) {
-        return formatContext.pathFactory(level);
+        return formatContext.pathFactory(new WriteFormatKey(level, false));
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingMergeTreeFileWriter(
             int level, FileSource fileSource) {
+        WriteFormatKey key = new WriteFormatKey(level, false);
         return new RollingFileWriter<>(
                 () -> {
-                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(level);
+                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(key);
                     return createDataFileWriter(
-                            pathFactory.newPath(), level, fileSource, 
pathFactory.isExternalPath());
+                            pathFactory.newPath(), key, fileSource, 
pathFactory.isExternalPath());
                 },
                 suggestedFileSize);
     }
 
     public RollingFileWriter<KeyValue, DataFileMeta> 
createRollingChangelogFileWriter(int level) {
+        WriteFormatKey key = new WriteFormatKey(level, true);
         return new RollingFileWriter<>(
                 () -> {
-                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(level);
+                    DataFilePathFactory pathFactory = 
formatContext.pathFactory(key);
                     return createDataFileWriter(
                             pathFactory.newChangelogPath(),
-                            level,
+                            key,
                             FileSource.APPEND,
                             pathFactory.isExternalPath());
                 },
@@ -117,34 +122,30 @@ public class KeyValueFileWriterFactory {
     }
 
     private KeyValueDataFileWriter createDataFileWriter(
-            Path path, int level, FileSource fileSource, boolean 
isExternalPath) {
+            Path path, WriteFormatKey key, FileSource fileSource, boolean 
isExternalPath) {
         return formatContext.thinModeEnabled
                 ? new KeyValueThinDataFileWriterImpl(
                         fileIO,
-                        formatContext.writerFactory(level),
+                        formatContext.fileWriterContext(key),
                         path,
                         new KeyValueThinSerializer(keyType, valueType)::toRow,
                         keyType,
                         valueType,
-                        formatContext.statsProducer(level, options),
                         schemaId,
-                        level,
-                        formatContext.compression(level),
+                        key.level,
                         options,
                         fileSource,
                         fileIndexOptions,
                         isExternalPath)
                 : new KeyValueDataFileWriterImpl(
                         fileIO,
-                        formatContext.writerFactory(level),
+                        formatContext.fileWriterContext(key),
                         path,
                         new KeyValueSerializer(keyType, valueType)::toRow,
                         keyType,
                         valueType,
-                        formatContext.statsProducer(level, options),
                         schemaId,
-                        level,
-                        formatContext.compression(level),
+                        key.level,
                         options,
                         fileSource,
                         fileIndexOptions,
@@ -152,12 +153,24 @@ public class KeyValueFileWriterFactory {
     }
 
     public void deleteFile(DataFileMeta file) {
-        
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
+        // this path factory is only for path generation, so we don't care 
about the true or false
+        // in WriteFormatKey
+        fileIO.deleteQuietly(
+                formatContext.pathFactory(new WriteFormatKey(file.level(), 
false)).toPath(file));
     }
 
     public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) 
throws IOException {
-        Path sourcePath = 
formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
-        Path targetPath = 
formatContext.pathFactory(targetFile.level()).toPath(targetFile);
+        // this path factory is only for path generation, so we don't care 
about the true or false
+        // in WriteFormatKey
+        boolean isChangelog = false;
+        Path sourcePath =
+                formatContext
+                        .pathFactory(new WriteFormatKey(sourceFile.level(), 
isChangelog))
+                        .toPath(sourceFile);
+        Path targetPath =
+                formatContext
+                        .pathFactory(new WriteFormatKey(targetFile.level(), 
isChangelog))
+                        .toPath(targetFile);
         fileIO.copyFile(sourcePath, targetPath, true);
     }
 
@@ -166,7 +179,7 @@ public class KeyValueFileWriterFactory {
     }
 
     public String newChangelogFileName(int level) {
-        return formatContext.pathFactory(level).newChangelogFileName();
+        return formatContext.pathFactory(new WriteFormatKey(level, 
true)).newChangelogFileName();
     }
 
     public static Builder builder(
@@ -175,7 +188,7 @@ public class KeyValueFileWriterFactory {
             RowType keyType,
             RowType valueType,
             FileFormat fileFormat,
-            Map<String, FileStorePathFactory> format2PathFactory,
+            Function<String, FileStorePathFactory> format2PathFactory,
             long suggestedFileSize) {
         return new Builder(
                 fileIO,
@@ -195,7 +208,7 @@ public class KeyValueFileWriterFactory {
         private final RowType keyType;
         private final RowType valueType;
         private final FileFormat fileFormat;
-        private final Map<String, FileStorePathFactory> format2PathFactory;
+        private final Function<String, FileStorePathFactory> 
format2PathFactory;
         private final long suggestedFileSize;
 
         private Builder(
@@ -204,7 +217,7 @@ public class KeyValueFileWriterFactory {
                 RowType keyType,
                 RowType valueType,
                 FileFormat fileFormat,
-                Map<String, FileStorePathFactory> format2PathFactory,
+                Function<String, FileStorePathFactory> format2PathFactory,
                 long suggestedFileSize) {
             this.fileIO = fileIO;
             this.schemaId = schemaId;
@@ -217,8 +230,8 @@ public class KeyValueFileWriterFactory {
 
         public KeyValueFileWriterFactory build(
                 BinaryRow partition, int bucket, CoreOptions options) {
-            WriteFormatContext context =
-                    new WriteFormatContext(
+            FileWriterContextFactory context =
+                    new FileWriterContextFactory(
                             partition,
                             bucket,
                             keyType,
@@ -231,11 +244,11 @@ public class KeyValueFileWriterFactory {
         }
     }
 
-    private static class WriteFormatContext {
+    private static class FileWriterContextFactory {
 
-        private final IntFunction<String> level2Format;
-        private final IntFunction<String> level2Compress;
-        private final IntFunction<String> level2Stats;
+        private final Function<WriteFormatKey, String> key2Format;
+        private final Function<WriteFormatKey, String> key2Compress;
+        private final Function<WriteFormatKey, String> key2Stats;
 
         private final Map<Pair<String, String>, Optional<SimpleStatsExtractor>>
                 formatStats2Extractor;
@@ -249,17 +262,17 @@ public class KeyValueFileWriterFactory {
         private final RowType keyType;
         private final RowType valueType;
         private final RowType writeRowType;
-        private final Map<String, FileStorePathFactory> parentFactories;
+        private final Function<String, FileStorePathFactory> parentFactories;
         private final CoreOptions options;
         private final boolean thinModeEnabled;
 
-        private WriteFormatContext(
+        private FileWriterContextFactory(
                 BinaryRow partition,
                 int bucket,
                 RowType keyType,
                 RowType valueType,
-                FileFormat defaultFormat,
-                Map<String, FileStorePathFactory> parentFactories,
+                FileFormat defaultFileFormat,
+                Function<String, FileStorePathFactory> parentFactories,
                 CoreOptions options) {
             this.partition = partition;
             this.bucket = bucket;
@@ -271,20 +284,39 @@ public class KeyValueFileWriterFactory {
                     options.dataFileThinMode() && supportsThinMode(keyType, 
valueType);
             this.writeRowType =
                     KeyValue.schema(thinModeEnabled ? RowType.of() : keyType, 
valueType);
+
             Map<Integer, String> fileFormatPerLevel = 
options.fileFormatPerLevel();
-            this.level2Format =
-                    level ->
-                            fileFormatPerLevel.getOrDefault(
-                                    level, 
defaultFormat.getFormatIdentifier());
+            String defaultFormat = defaultFileFormat.getFormatIdentifier();
+            @Nullable String changelogFormat = options.changelogFileFormat();
+            this.key2Format =
+                    key -> {
+                        if (key.isChangelog && changelogFormat != null) {
+                            return changelogFormat;
+                        }
+                        return fileFormatPerLevel.getOrDefault(key.level, 
defaultFormat);
+                    };
 
             String defaultCompress = options.fileCompression();
+            @Nullable String changelogCompression = 
options.changelogFileCompression();
             Map<Integer, String> fileCompressionPerLevel = 
options.fileCompressionPerLevel();
-            this.level2Compress =
-                    level -> fileCompressionPerLevel.getOrDefault(level, 
defaultCompress);
+            this.key2Compress =
+                    key -> {
+                        if (key.isChangelog && changelogCompression != null) {
+                            return changelogCompression;
+                        }
+                        return fileCompressionPerLevel.getOrDefault(key.level, 
defaultCompress);
+                    };
 
             String statsMode = options.statsMode();
             Map<Integer, String> statsModePerLevel = 
options.statsModePerLevel();
-            this.level2Stats = level -> statsModePerLevel.getOrDefault(level, 
statsMode);
+            @Nullable String changelogStatsMode = 
options.changelogFileStatsMode();
+            this.key2Stats =
+                    key -> {
+                        if (key.isChangelog && changelogStatsMode != null) {
+                            return changelogStatsMode;
+                        }
+                        return statsModePerLevel.getOrDefault(key.level, 
statsMode);
+                    };
 
             this.formatStats2Extractor = new HashMap<>();
             this.statsMode2AvroStats = new HashMap<>();
@@ -308,16 +340,21 @@ public class KeyValueFileWriterFactory {
             return true;
         }
 
-        private SimpleStatsProducer statsProducer(int level, CoreOptions 
options) {
-            String format = level2Format.apply(level);
-            String statsMode = level2Stats.apply(level);
+        private FileWriterContext fileWriterContext(WriteFormatKey key) {
+            return new FileWriterContext(
+                    writerFactory(key), statsProducer(key), 
key2Compress.apply(key));
+        }
+
+        private SimpleStatsProducer statsProducer(WriteFormatKey key) {
+            String format = key2Format.apply(key);
+            String statsMode = key2Stats.apply(key);
             if (format.equals("avro")) {
                 // In avro format, minValue, maxValue, and nullCount are not 
counted, so use
                 // SimpleStatsExtractor to collect stats
                 SimpleColStatsCollector.Factory[] factories =
                         statsMode2AvroStats.computeIfAbsent(
                                 statsMode,
-                                key ->
+                                k ->
                                         
StatsCollectorFactories.createStatsFactoriesForAvro(
                                                 statsMode, options, 
writeRowType.getFieldNames()));
                 SimpleStatsCollector collector = new 
SimpleStatsCollector(writeRowType, factories);
@@ -327,56 +364,71 @@ public class KeyValueFileWriterFactory {
             Optional<SimpleStatsExtractor> extractor =
                     formatStats2Extractor.computeIfAbsent(
                             Pair.of(format, statsMode),
-                            key -> {
-                                SimpleColStatsCollector.Factory[] 
statsFactories =
-                                        
StatsCollectorFactories.createStatsFactories(
-                                                statsMode,
-                                                options,
-                                                writeRowType.getFieldNames(),
-                                                thinModeEnabled
-                                                        ? 
keyType.getFieldNames()
-                                                        : 
Collections.emptyList());
-                                boolean isDisabled =
-                                        Arrays.stream(
-                                                        
SimpleColStatsCollector.create(
-                                                                
statsFactories))
-                                                .allMatch(
-                                                        p ->
-                                                                p
-                                                                        
instanceof
-                                                                        
NoneSimpleColStatsCollector);
-                                if (isDisabled) {
-                                    return Optional.empty();
-                                }
-                                return fileFormat(format)
-                                        .createStatsExtractor(writeRowType, 
statsFactories);
-                            });
+                            k -> createSimpleStatsExtractor(format, 
statsMode));
             return SimpleStatsProducer.fromExtractor(extractor.orElse(null));
         }
 
-        private DataFilePathFactory pathFactory(int level) {
-            String format = level2Format.apply(level);
+        private Optional<SimpleStatsExtractor> createSimpleStatsExtractor(
+                String format, String statsMode) {
+            SimpleColStatsCollector.Factory[] statsFactories =
+                    StatsCollectorFactories.createStatsFactories(
+                            statsMode,
+                            options,
+                            writeRowType.getFieldNames(),
+                            thinModeEnabled ? keyType.getFieldNames() : 
Collections.emptyList());
+            boolean isDisabled =
+                    
Arrays.stream(SimpleColStatsCollector.create(statsFactories))
+                            .allMatch(p -> p instanceof 
NoneSimpleColStatsCollector);
+            if (isDisabled) {
+                return Optional.empty();
+            }
+            return fileFormat(format).createStatsExtractor(writeRowType, 
statsFactories);
+        }
+
+        private DataFilePathFactory pathFactory(WriteFormatKey key) {
+            String format = key2Format.apply(key);
             return format2PathFactory.computeIfAbsent(
                     format,
-                    key ->
+                    k ->
                             parentFactories
-                                    .get(format)
+                                    .apply(format)
                                     .createDataFilePathFactory(partition, 
bucket));
         }
 
-        private FormatWriterFactory writerFactory(int level) {
+        private FormatWriterFactory writerFactory(WriteFormatKey key) {
             return format2WriterFactory.computeIfAbsent(
-                    level2Format.apply(level),
+                    key2Format.apply(key),
                     format -> 
fileFormat(format).createWriterFactory(writeRowType));
         }
 
-        private String compression(int level) {
-            return level2Compress.apply(level);
-        }
-
         private FileFormat fileFormat(String format) {
             return formatFactory.computeIfAbsent(
                     format, k -> FileFormat.fromIdentifier(format, 
options.toConfiguration()));
         }
     }
+
+    private static class WriteFormatKey {
+
+        private final int level;
+        private final boolean isChangelog;
+
+        private WriteFormatKey(int level, boolean isChangelog) {
+            this.level = level;
+            this.isChangelog = isChangelog;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            WriteFormatKey formatKey = (WriteFormatKey) o;
+            return level == formatKey.level && isChangelog == 
formatKey.isChangelog;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(level, isChangelog);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
index 128599939a..86d8df0ce7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -46,31 +45,27 @@ public class KeyValueThinDataFileWriterImpl extends 
KeyValueDataFileWriter {
 
     public KeyValueThinDataFileWriterImpl(
             FileIO fileIO,
-            FormatWriterFactory factory,
+            FileWriterContext context,
             Path path,
             Function<KeyValue, InternalRow> converter,
             RowType keyType,
             RowType valueType,
-            SimpleStatsProducer statsProducer,
             long schemaId,
             int level,
-            String compression,
             CoreOptions options,
             FileSource fileSource,
             FileIndexOptions fileIndexOptions,
             boolean isExternalPath) {
         super(
                 fileIO,
-                factory,
+                context,
                 path,
                 converter,
                 keyType,
                 valueType,
                 KeyValue.schema(RowType.of(), valueType),
-                statsProducer,
                 schemaId,
                 level,
-                compression,
                 options,
                 fileSource,
                 fileIndexOptions,
@@ -97,7 +92,7 @@ public class KeyValueThinDataFileWriterImpl extends 
KeyValueDataFileWriter {
     Pair<SimpleColStats[], SimpleColStats[]> 
fetchKeyValueStats(SimpleColStats[] rowStats) {
         int numKeyFields = keyType.getFieldCount();
         // In thin mode, there is no key stats in rowStats, so we only jump
-        // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value 
is 2.
+        // _SEQUENCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value 
is 2.
         SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, 2, 
rowStats.length);
         // Thin mode on, so need to map value stats to key stats.
         SimpleColStats[] keyStats = new SimpleColStats[numKeyFields];
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index b5d2c9f0df..432845de32 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.FileSource;
@@ -54,27 +53,17 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
 
     public RowDataFileWriter(
             FileIO fileIO,
-            FormatWriterFactory factory,
+            FileWriterContext context,
             Path path,
             RowType writeSchema,
-            SimpleStatsProducer statsProducer,
             long schemaId,
             LongCounter seqNumCounter,
-            String fileCompression,
             FileIndexOptions fileIndexOptions,
             FileSource fileSource,
             boolean asyncFileWrite,
             boolean statsDenseStore,
             boolean isExternalPath) {
-        super(
-                fileIO,
-                factory,
-                path,
-                Function.identity(),
-                writeSchema,
-                statsProducer,
-                fileCompression,
-                asyncFileWrite);
+        super(fileIO, context, path, Function.identity(), writeSchema, 
asyncFileWrite);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.isExternalPath = isExternalPath;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index b6f4303946..9b68b774f7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
@@ -53,13 +54,12 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                 () ->
                         new RowDataFileWriter(
                                 fileIO,
-                                fileFormat.createWriterFactory(writeSchema),
+                                createFileWriterContext(
+                                        fileFormat, writeSchema, 
statsCollectors, fileCompression),
                                 pathFactory.newPath(),
                                 writeSchema,
-                                createStatsProducer(fileFormat, writeSchema, 
statsCollectors),
                                 schemaId,
                                 seqNumCounter,
-                                fileCompression,
                                 fileIndexOptions,
                                 fileSource,
                                 asyncFileWrite,
@@ -68,6 +68,18 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                 targetFileSize);
     }
 
+    @VisibleForTesting
+    static FileWriterContext createFileWriterContext(
+            FileFormat fileFormat,
+            RowType rowType,
+            SimpleColStatsCollector.Factory[] statsCollectors,
+            String fileCompression) {
+        return new FileWriterContext(
+                fileFormat.createWriterFactory(rowType),
+                createStatsProducer(fileFormat, rowType, statsCollectors),
+                fileCompression);
+    }
+
     private static SimpleStatsProducer createStatsProducer(
             FileFormat fileFormat,
             RowType rowType,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index e286836fdf..20cebfcb27 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.io;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -45,16 +44,14 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
 
     public StatsCollectingSingleFileWriter(
             FileIO fileIO,
-            FormatWriterFactory factory,
+            FileWriterContext context,
             Path path,
             Function<T, InternalRow> converter,
             RowType rowType,
-            SimpleStatsProducer statsProducer,
-            String compression,
             boolean asyncWrite) {
-        super(fileIO, factory, path, converter, compression, asyncWrite);
+        super(fileIO, context.factory(), path, converter, 
context.compression(), asyncWrite);
         this.rowType = rowType;
-        this.statsProducer = statsProducer;
+        this.statsProducer = context.statsProducer();
         this.isStatsDisabled = statsProducer.isStatsDisabled();
         this.statsRequirePerRecord = statsProducer.requirePerRecord();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 45f6fb3f3b..b63023653a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -30,14 +30,13 @@ import org.apache.paimon.types.RowType;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** Factory which produces {@link Path}s for manifest files. */
@@ -297,14 +296,10 @@ public class FileStorePathFactory {
         };
     }
 
-    public static Map<String, FileStorePathFactory> createFormatPathFactories(
+    public static Function<String, FileStorePathFactory> 
createFormatPathFactories(
             CoreOptions options,
             BiFunction<CoreOptions, String, FileStorePathFactory> 
formatPathFactory) {
-        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
-        Set<String> formats = new 
HashSet<>(options.fileFormatPerLevel().values());
-        formats.add(options.fileFormatString());
-        formats.forEach(
-                format -> pathFactoryMap.put(format, 
formatPathFactory.apply(options, format)));
-        return pathFactoryMap;
+        Map<String, FileStorePathFactory> map = new ConcurrentHashMap<>();
+        return format -> map.computeIfAbsent(format, k -> 
formatPathFactory.apply(options, format));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 2d0efdb6d1..10479892b0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -53,7 +53,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
@@ -244,22 +243,24 @@ public class KeyValueFileReadWriteTest {
         Options options = new Options();
         options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
 
-        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
-        pathFactoryMap.put(format, pathFactory);
-        pathFactoryMap.put(
-                CoreOptions.FILE_FORMAT.defaultValue().toString(),
-                new FileStorePathFactory(
-                        path,
-                        RowType.of(),
-                        CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
-                        CoreOptions.FILE_FORMAT.defaultValue().toString(),
-                        CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
-                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
-                        CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        null,
-                        null));
+        Function<String, FileStorePathFactory> pathFactoryMap =
+                new Function<String, FileStorePathFactory>() {
+                    @Override
+                    public FileStorePathFactory apply(String format) {
+                        return new FileStorePathFactory(
+                                path,
+                                RowType.of(),
+                                
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+                                format,
+                                CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+                                
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                                
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                                
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                                CoreOptions.FILE_COMPRESSION.defaultValue(),
+                                null,
+                                null);
+                    }
+                };
 
         return KeyValueFileWriterFactory.builder(
                         fileIO,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index d3eeb1fdef..a11c0e4d78 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -71,7 +71,12 @@ public class RollingFileWriterTest {
                         () ->
                                 new RowDataFileWriter(
                                         LocalFileIO.create(),
-                                        fileFormat.createWriterFactory(SCHEMA),
+                                        
RowDataRollingFileWriter.createFileWriterContext(
+                                                fileFormat,
+                                                SCHEMA,
+                                                
SimpleColStatsCollector.createFullStatsFactories(
+                                                        
SCHEMA.getFieldCount()),
+                                                
CoreOptions.FILE_COMPRESSION.defaultValue()),
                                         new DataFilePathFactory(
                                                         new Path(tempDir + 
"/bucket-0"),
                                                         CoreOptions.FILE_FORMAT
@@ -86,18 +91,8 @@ public class RollingFileWriterTest {
                                                         null)
                                                 .newPath(),
                                         SCHEMA,
-                                        SimpleStatsProducer.fromExtractor(
-                                                fileFormat
-                                                        .createStatsExtractor(
-                                                                SCHEMA,
-                                                                
SimpleColStatsCollector
-                                                                        
.createFullStatsFactories(
-                                                                               
 SCHEMA
-                                                                               
         .getFieldCount()))
-                                                        .orElse(null)),
                                         0L,
                                         new LongCounter(0),
-                                        
CoreOptions.FILE_COMPRESSION.defaultValue(),
                                         new FileIndexOptions(),
                                         FileSource.APPEND,
                                         true,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
index bbfa4d3659..47655a419b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
@@ -198,7 +198,7 @@ public class ChangelogMergeTreeRewriterTest {
                         keyType,
                         valueType,
                         new FlushingFileFormat("avro"),
-                        Collections.singletonMap("avro", 
createNonPartFactory(path)),
+                        k -> createNonPartFactory(path),
                         VALUE_128_MB.getBytes())
                 .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index fa9628b4c1..0bef84e7d8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.apache.paimon.options.MemorySize.VALUE_128_MB;
@@ -223,8 +224,7 @@ public class ContainsLevelsTest {
 
     private KeyValueFileWriterFactory createWriterFactory() {
         Path path = new Path(tempDir.toUri().toString());
-        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
-        pathFactoryMap.put("avro", createNonPartFactory(path));
+        Function<String, FileStorePathFactory> pathFactoryMap = k -> 
createNonPartFactory(path);
         return KeyValueFileWriterFactory.builder(
                         FileIOFinder.find(path),
                         0,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index b68a82935b..d4ddde5a44 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 
 import static org.apache.paimon.KeyValue.UNKNOWN_SEQUENCE;
 import static org.apache.paimon.io.DataFileTestUtils.row;
@@ -305,8 +306,7 @@ public class LookupLevelsTest {
     private KeyValueFileWriterFactory createWriterFactory() {
         Path path = new Path(tempDir.toUri().toString());
         String identifier = "avro";
-        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
-        pathFactoryMap.put(identifier, createNonPartFactory(path));
+        Function<String, FileStorePathFactory> pathFactoryMap = k -> 
createNonPartFactory(path);
         return KeyValueFileWriterFactory.builder(
                         FileIOFinder.find(path),
                         0,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1207daa86c..c149e44199 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -86,6 +86,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
@@ -175,8 +176,7 @@ public abstract class MergeTreeTestBase {
         compactReaderFactory =
                 readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, 
DeletionVector.emptyFactory());
 
-        Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
-        pathFactoryMap.put(identifier, pathFactory);
+        Function<String, FileStorePathFactory> pathFactoryMap = k -> 
pathFactory;
         KeyValueFileWriterFactory.Builder writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         LocalFileIO.create(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 6f6be4b803..59dee1092a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -100,6 +100,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_STATS_MODE;
 import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -109,6 +111,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
 import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
 import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE_PER_LEVEL;
 import static org.apache.paimon.CoreOptions.MergeEngine;
 import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -528,10 +531,20 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                                 
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
     }
 
-    @Test
-    public void testStreamingInputChangelog() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testStreamingInputChangelog(boolean specificConfig) throws 
Exception {
         FileStoreTable table =
-                createFileStoreTable(conf -> conf.set(CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT));
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT);
+                            if (specificConfig) {
+                                conf.set(FILE_FORMAT, "parquet");
+                                conf.set(METADATA_STATS_MODE, "full");
+                                conf.set(CHANGELOG_FILE_FORMAT, "avro");
+                                conf.set(CHANGELOG_FILE_STATS_MODE, "none");
+                            }
+                        });
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
         write.write(rowData(1, 10, 100L));
@@ -560,6 +573,15 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                         "+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset",
                         "-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
                         "+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset");
+
+        if (specificConfig) {
+            Snapshot snapshot = table.latestSnapshot().get();
+            ManifestFileMeta manifest =
+                    
table.manifestListReader().read(snapshot.changelogManifestList()).get(0);
+            DataFileMeta file = 
table.manifestFileReader().read(manifest.fileName()).get(0).file();
+            assertThat(file.fileName()).endsWith(".avro");
+            
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(0);
+        }
     }
 
     @Test
@@ -2130,13 +2152,20 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         innerTestTableQuery(table);
     }
 
-    @Test
-    public void testLookupWithDropDelete() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLookupWithDropDelete(boolean specificConfig) throws 
Exception {
         FileStoreTable table =
                 createFileStoreTable(
                         conf -> {
                             conf.set(CHANGELOG_PRODUCER, LOOKUP);
                             conf.set("num-levels", "2");
+                            if (specificConfig) {
+                                conf.set(FILE_FORMAT, "parquet");
+                                conf.set(METADATA_STATS_MODE, "full");
+                                conf.set(CHANGELOG_FILE_FORMAT, "avro");
+                                conf.set(CHANGELOG_FILE_STATS_MODE, "none");
+                            }
                         });
         IOManager ioManager = IOManager.create(tablePath.toString());
         StreamTableWrite write = 
table.newWrite(commitUser).withIOManager(ioManager);
@@ -2165,6 +2194,14 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                 .isEqualTo(
                         Collections.singletonList(
                                 
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
+
+        if (specificConfig) {
+            ManifestFileMeta manifest =
+                    
table.manifestListReader().read(latestSnapshot.changelogManifestList()).get(0);
+            DataFileMeta file = 
table.manifestFileReader().read(manifest.fileName()).get(0).file();
+            assertThat(file.fileName()).endsWith(".avro");
+            
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(0);
+        }
     }
 
     @ParameterizedTest(name = "changelog-producer = {0}")

Reply via email to