This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a21a6a11f [core] Introduce FieldStatsCollector.Factory to fix reusing 
bug
a21a6a11f is described below

commit a21a6a11feff5b79f0641b1c6e7929e071f48ec5
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jul 4 13:40:52 2023 +0800

    [core] Introduce FieldStatsCollector.Factory to fix reusing bug
---
 .../java/org/apache/paimon/format/FileFormat.java  |  2 +-
 .../apache/paimon/format/TableStatsCollector.java  |  8 ++---
 .../paimon/statistics/FieldStatsCollector.java     | 23 +++++++++++---
 .../TableFieldStatsExtractorTestBaseCollector.java |  4 +--
 .../java/org/apache/paimon/AbstractFileStore.java  |  4 +--
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java |  4 +--
 .../apache/paimon/io/KeyValueDataFileWriter.java   |  4 +--
 .../paimon/io/KeyValueFileWriterFactory.java       |  4 +--
 .../org/apache/paimon/io/RowDataFileWriter.java    |  4 +--
 .../apache/paimon/io/RowDataRollingFileWriter.java |  8 +++--
 .../paimon/io/StatsCollectingSingleFileWriter.java |  9 ++----
 .../org/apache/paimon/manifest/ManifestFile.java   | 10 +++---
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  7 +++--
 ...ctorUtils.java => StatsCollectorFactories.java} | 18 ++++++++---
 .../apache/paimon/append/AppendOnlyWriterTest.java |  4 +--
 .../apache/paimon/format/FileFormatSuffixTest.java |  4 +--
 .../format/FileStatsExtractingAvroFormat.java      |  2 +-
 .../paimon/io/DataFileTestDataGenerator.java       | 14 ++++++---
 .../paimon/io/KeyValueFileReadWriteTest.java       | 36 ++++++++++------------
 .../apache/paimon/io/RollingFileWriterTest.java    | 16 +++-------
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  4 +--
 .../apache/paimon/manifest/ManifestFileTest.java   |  4 +--
 .../paimon/manifest/ManifestTestDataGenerator.java |  7 +++--
 .../paimon/stats/TableStatsCollectorTest.java      |  8 +++--
 .../paimon/stats/TestTableStatsExtractor.java      |  4 +--
 ...sTest.java => StatsCollectorFactoriesTest.java} |  9 +++---
 .../apache/paimon/format/orc/OrcFileFormat.java    |  2 +-
 .../format/orc/filter/OrcTableStatsExtractor.java  | 15 +++++----
 .../paimon/format/parquet/ParquetFileFormat.java   |  2 +-
 .../format/parquet/ParquetTableStatsExtractor.java | 21 ++++++-------
 31 files changed, 143 insertions(+), 120 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index 9527cf140..2197c513f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -77,7 +77,7 @@ public abstract class FileFormat {
     }
 
     public Optional<TableStatsExtractor> createStatsExtractor(
-            RowType type, FieldStatsCollector[] statsCollectors) {
+            RowType type, FieldStatsCollector.Factory[] statsCollectors) {
         return Optional.empty();
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index 4e84a0309..c8f7322a1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -34,15 +34,15 @@ public class TableStatsCollector {
     private final FieldStatsCollector[] statsCollectors;
     private final Serializer<Object>[] fieldSerializers;
 
-    public TableStatsCollector(RowType rowType, FieldStatsCollector[] 
statsCollectors) {
+    public TableStatsCollector(RowType rowType, FieldStatsCollector.Factory[] 
collectorFactory) {
         int numFields = rowType.getFieldCount();
         checkArgument(
-                numFields == statsCollectors.length,
+                numFields == collectorFactory.length,
                 "numFields %s should equal to stats length %s.",
                 numFields,
-                statsCollectors.length);
+                collectorFactory.length);
+        this.statsCollectors = FieldStatsCollector.create(collectorFactory);
         this.converter = new RowDataToObjectArrayConverter(rowType);
-        this.statsCollectors = statsCollectors;
         this.fieldSerializers = new Serializer[numFields];
         for (int i = 0; i < numFields; i++) {
             fieldSerializers[i] = 
InternalSerializers.create(rowType.getTypeAt(i));
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
index d77a2d266..99ce52285 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/statistics/FieldStatsCollector.java
@@ -49,20 +49,33 @@ public interface FieldStatsCollector {
      */
     FieldStats convert(FieldStats source);
 
-    static FieldStatsCollector from(String option) {
+    /** Factory to create {@link FieldStatsCollector}. */
+    interface Factory {
+        FieldStatsCollector create();
+    }
+
+    static FieldStatsCollector[] create(FieldStatsCollector.Factory[] 
factories) {
+        FieldStatsCollector[] collectors = new 
FieldStatsCollector[factories.length];
+        for (int i = 0; i < factories.length; i++) {
+            collectors[i] = factories[i].create();
+        }
+        return collectors;
+    }
+
+    static Factory from(String option) {
         String upper = option.toUpperCase();
         switch (upper) {
             case "NONE":
-                return new NoneFieldStatsCollector();
+                return NoneFieldStatsCollector::new;
             case "FULL":
-                return new FullFieldStatsCollector();
+                return FullFieldStatsCollector::new;
             case "COUNTS":
-                return new CountsFieldStatsCollector();
+                return CountsFieldStatsCollector::new;
             default:
                 Matcher matcher = TRUNCATE_PATTERN.matcher(upper);
                 if (matcher.matches()) {
                     String length = matcher.group(1);
-                    return new 
TruncateFieldStatsCollector(Integer.parseInt(length));
+                    return () -> new 
TruncateFieldStatsCollector(Integer.parseInt(length));
                 }
                 throw new IllegalArgumentException("Unexpected option: " + 
option);
         }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
 
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
index b3eda7c70..55c55625d 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/TableFieldStatsExtractorTestBaseCollector.java
@@ -75,10 +75,10 @@ public abstract class 
TableFieldStatsExtractorTestBaseCollector {
         FileFormat format = createFormat();
         RowType rowType = rowType();
         int count = rowType().getFieldCount();
-        FieldStatsCollector[] stats =
+        FieldStatsCollector.Factory[] stats =
                 IntStream.range(0, count)
                         .mapToObj(p -> FieldStatsCollector.from(mode))
-                        .toArray(FieldStatsCollector[]::new);
+                        .toArray(FieldStatsCollector.Factory[]::new);
 
         FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
         Path path = new Path(tempDir.toString() + "/test");
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index c6f924f0f..bce0d9b1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -35,10 +35,10 @@ import org.apache.paimon.operation.TagFileKeeper;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
 import org.apache.paimon.utils.TagManager;
 
 import javax.annotation.Nullable;
@@ -106,7 +106,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
                 forWrite ? writeManifestCache : null,
-                FieldStatsCollectorUtils.getFieldsStatsMode(
+                StatsCollectorFactories.createStatsFactories(
                         options, partitionType.getFieldNames()));
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index eb127d872..010d6e169 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -698,7 +698,7 @@ public class CoreOptions implements Serializable {
 
     public static final String STATS_MODE_SUFFIX = "stats-mode";
 
-    public static final ConfigOption<String> STATS_MODE =
+    public static final ConfigOption<String> METADATA_STATS_MODE =
             key("metadata." + STATS_MODE_SUFFIX)
                     .stringType()
                     .defaultValue("truncate(16)")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 9cdec0f0b..067520949 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -63,7 +63,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
-    private final FieldStatsCollector[] statsCollectors;
+    private final FieldStatsCollector.Factory[] statsCollectors;
 
     private RowDataRollingFileWriter writer;
 
@@ -79,7 +79,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             DataFilePathFactory pathFactory,
             @Nullable CommitIncrement increment,
             String fileCompression,
-            FieldStatsCollector[] statsCollectors) {
+            FieldStatsCollector.Factory[] statsCollectors) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 27cf60c4f..86ffc28fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -31,7 +31,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,7 +88,7 @@ public class KeyValueDataFileWriter
                 KeyValue.schema(keyType, valueType),
                 tableStatsExtractor,
                 compression,
-                FieldStatsCollectorUtils.getFieldsStatsMode(
+                StatsCollectorFactories.createStatsFactories(
                         options, KeyValue.schema(keyType, 
valueType).getFieldNames()));
 
         this.keyType = keyType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 0c8f78cd5..f3912daf5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -29,8 +29,8 @@ import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import javax.annotation.Nullable;
 
@@ -187,7 +187,7 @@ public class KeyValueFileWriterFactory {
                     fileFormat
                             .createStatsExtractor(
                                     recordType,
-                                    
FieldStatsCollectorUtils.getFieldsStatsMode(
+                                    
StatsCollectorFactories.createStatsFactories(
                                             options, 
recordType.getFieldNames()))
                             .orElse(null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index 8cbbe8ed8..b3dd2d343 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -54,7 +54,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
             long schemaId,
             LongCounter seqNumCounter,
             String fileCompression,
-            FieldStatsCollector[] stats) {
+            FieldStatsCollector.Factory[] statsCollectors) {
         super(
                 fileIO,
                 factory,
@@ -63,7 +63,7 @@ public class RowDataFileWriter extends 
StatsCollectingSingleFileWriter<InternalR
                 writeSchema,
                 tableStatsExtractor,
                 fileCompression,
-                stats);
+                statsCollectors);
         this.schemaId = schemaId;
         this.seqNumCounter = seqNumCounter;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 8b99b6389..e15f62248 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -38,7 +38,7 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
             DataFilePathFactory pathFactory,
             LongCounter seqNumCounter,
             String fileCompression,
-            FieldStatsCollector[] stats) {
+            FieldStatsCollector.Factory[] statsCollectors) {
         super(
                 () ->
                         new RowDataFileWriter(
@@ -46,11 +46,13 @@ public class RowDataRollingFileWriter extends 
RollingFileWriter<InternalRow, Dat
                                 fileFormat.createWriterFactory(writeSchema),
                                 pathFactory.newPath(),
                                 writeSchema,
-                                fileFormat.createStatsExtractor(writeSchema, 
stats).orElse(null),
+                                fileFormat
+                                        .createStatsExtractor(writeSchema, 
statsCollectors)
+                                        .orElse(null),
                                 schemaId,
                                 seqNumCounter,
                                 fileCompression,
-                                stats),
+                                statsCollectors),
                 targetFileSize);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index ce5d57289..559b0f181 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -27,14 +27,12 @@ import org.apache.paimon.format.TableStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.statistics.NoneFieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.function.Function;
 
 /**
@@ -47,7 +45,6 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
 
     @Nullable private final TableStatsExtractor tableStatsExtractor;
     @Nullable private TableStatsCollector tableStatsCollector = null;
-    private final boolean isStatsCollectorDisabled;
 
     public StatsCollectingSingleFileWriter(
             FileIO fileIO,
@@ -57,11 +54,9 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
             RowType writeSchema,
             @Nullable TableStatsExtractor tableStatsExtractor,
             String compression,
-            FieldStatsCollector[] statsCollectors) {
+            FieldStatsCollector.Factory[] statsCollectors) {
         super(fileIO, factory, path, converter, compression);
         this.tableStatsExtractor = tableStatsExtractor;
-        this.isStatsCollectorDisabled =
-                Arrays.stream(statsCollectors).allMatch(p -> p instanceof 
NoneFieldStatsCollector);
         if (this.tableStatsExtractor == null) {
             this.tableStatsCollector = new TableStatsCollector(writeSchema, 
statsCollectors);
         }
@@ -73,7 +68,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
     @Override
     public void write(T record) throws IOException {
         InternalRow rowData = writeImpl(record);
-        if (tableStatsCollector != null && !isStatsCollectorDisabled) {
+        if (tableStatsCollector != null) {
             tableStatsCollector.collect(rowData);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 562077920..e41184d1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -53,7 +53,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
     private final RowType partitionType;
     private final FormatWriterFactory writerFactory;
     private final long suggestedFileSize;
-    private final FieldStatsCollector[] partitionStats;
+    private final FieldStatsCollector.Factory[] partitionStats;
 
     private ManifestFile(
             FileIO fileIO,
@@ -65,7 +65,7 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> {
             PathFactory pathFactory,
             long suggestedFileSize,
             @Nullable SegmentsCache<String> cache,
-            FieldStatsCollector[] partitionStats) {
+            FieldStatsCollector.Factory[] partitionStats) {
         super(fileIO, serializer, readerFactory, writerFactory, pathFactory, 
cache);
         this.schemaManager = schemaManager;
         this.partitionType = partitionType;
@@ -116,7 +116,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                 FormatWriterFactory factory,
                 Path path,
                 String fileCompression,
-                FieldStatsCollector[] partitionStats) {
+                FieldStatsCollector.Factory[] partitionStats) {
             super(ManifestFile.this.fileIO, factory, path, serializer::toRow, 
fileCompression);
 
             this.partitionStatsCollector = new 
TableStatsCollector(partitionType, partitionStats);
@@ -166,7 +166,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
         @Nullable private final SegmentsCache<String> cache;
-        private final FieldStatsCollector[] partitionStats;
+        private final FieldStatsCollector.Factory[] partitionStats;
 
         public Factory(
                 FileIO fileIO,
@@ -176,7 +176,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize,
                 @Nullable SegmentsCache<String> cache,
-                FieldStatsCollector[] partitionStats) {
+                FieldStatsCollector.Factory[] partitionStats) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
             this.partitionType = partitionType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index eb24e4b5e..8019df5c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -37,11 +37,11 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import javax.annotation.Nullable;
 
@@ -67,9 +67,10 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
     private final boolean commitForceCompact;
     private final boolean assertDisorder;
     private final String fileCompression;
+    private final FieldStatsCollector.Factory[] statsCollectors;
+
     private boolean skipCompaction;
     private BucketMode bucketMode = BucketMode.FIXED;
-    private FieldStatsCollector[] statsCollectors;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
@@ -96,7 +97,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
         this.assertDisorder = 
options.toConfiguration().get(APPEND_ONLY_ASSERT_DISORDER);
         this.fileCompression = options.fileCompression();
         this.statsCollectors =
-                FieldStatsCollectorUtils.getFieldsStatsMode(options, 
rowType.getFieldNames());
+                StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
similarity index 73%
rename from 
paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
rename to 
paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
index 5828462d9..08b3a780c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FieldStatsCollectorUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/StatsCollectorFactories.java
@@ -23,20 +23,22 @@ package org.apache.paimon.utils;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
 
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.STATS_MODE_SUFFIX;
 import static org.apache.paimon.options.ConfigOptions.key;
 
-/** The stats utils. */
-public class FieldStatsCollectorUtils {
+/** The stats utils to create {@link FieldStatsCollector.Factory}s. */
+public class StatsCollectorFactories {
 
-    public static FieldStatsCollector[] getFieldsStatsMode(
+    public static FieldStatsCollector.Factory[] createStatsFactories(
             CoreOptions options, List<String> fields) {
         Options cfg = options.toConfiguration();
-        FieldStatsCollector[] modes = new FieldStatsCollector[fields.size()];
+        FieldStatsCollector.Factory[] modes = new 
FieldStatsCollector.Factory[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
             String fieldMode =
                     cfg.get(
@@ -48,9 +50,15 @@ public class FieldStatsCollectorUtils {
             if (fieldMode != null) {
                 modes[i] = FieldStatsCollector.from(fieldMode);
             } else {
-                modes[i] = 
FieldStatsCollector.from(cfg.get(CoreOptions.STATS_MODE));
+                modes[i] = 
FieldStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
             }
         }
         return modes;
     }
+
+    public static FieldStatsCollector.Factory[] createFullStatsFactories(int 
numFields) {
+        FieldStatsCollector.Factory[] factories = new 
FieldStatsCollector.Factory[numFields];
+        Arrays.fill(factories, (FieldStatsCollector.Factory) 
FullFieldStatsCollector::new);
+        return factories;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2720252d0..171b725fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -38,9 +38,9 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExecutorThreadFactory;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -320,7 +320,7 @@ public class AppendOnlyWriterTest {
                         pathFactory,
                         null,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        FieldStatsCollectorUtils.getFieldsStatsMode(
+                        StatsCollectorFactories.createStatsFactories(
                                 new CoreOptions(new HashMap<>()),
                                 AppendOnlyWriterTest.SCHEMA.getFieldNames()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 604998bc8..09b36d2e6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -35,7 +35,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.CommitIncrement;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -77,7 +77,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         dataFilePathFactory,
                         null,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
-                        FieldStatsCollectorUtils.getFieldsStatsMode(
+                        StatsCollectorFactories.createStatsFactories(
                                 new CoreOptions(new HashMap<>()), 
SCHEMA.getFieldNames()));
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
 
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
index 218cd141e..0cdcaebe8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileStatsExtractingAvroFormat.java
@@ -57,7 +57,7 @@ public class FileStatsExtractingAvroFormat extends FileFormat 
{
 
     @Override
     public Optional<TableStatsExtractor> createStatsExtractor(
-            RowType type, FieldStatsCollector[] stats) {
+            RowType type, FieldStatsCollector.Factory[] stats) {
         return Optional.of(new TestTableStatsExtractor(this, type, stats));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
index c767f1d7b..6301fbc3d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java
@@ -106,14 +106,20 @@ public class DataFileTestDataGenerator {
                 new TableStatsCollector(
                         TestKeyValueGenerator.KEY_TYPE,
                         IntStream.range(0, 
TestKeyValueGenerator.KEY_TYPE.getFieldCount())
-                                .mapToObj(i -> new FullFieldStatsCollector())
-                                .toArray(FieldStatsCollector[]::new));
+                                .mapToObj(
+                                        i ->
+                                                (FieldStatsCollector.Factory)
+                                                        
FullFieldStatsCollector::new)
+                                .toArray(FieldStatsCollector.Factory[]::new));
         TableStatsCollector valueStatsCollector =
                 new TableStatsCollector(
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         IntStream.range(0, 
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFieldCount())
-                                .mapToObj(i -> new FullFieldStatsCollector())
-                                .toArray(FieldStatsCollector[]::new));
+                                .mapToObj(
+                                        i ->
+                                                (FieldStatsCollector.Factory)
+                                                        
FullFieldStatsCollector::new)
+                                .toArray(FieldStatsCollector.Factory[]::new));
         FieldStatsArraySerializer keyStatsSerializer =
                 new FieldStatsArraySerializer(TestKeyValueGenerator.KEY_TYPE);
         FieldStatsArraySerializer valueStatsSerializer =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 5ec574605..d612a1d1b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -55,6 +55,8 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 
+import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
+import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE;
 import static org.apache.paimon.TestKeyValueGenerator.createTestSchemaManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -97,12 +99,7 @@ public class KeyValueFileReadWriteTest {
         writer.close();
         List<DataFileMeta> actualMetas = writer.result();
 
-        checkRollingFiles(
-                TestKeyValueGenerator.KEY_TYPE,
-                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-                data.meta,
-                actualMetas,
-                writer.targetFileSize());
+        checkRollingFiles(data.meta, actualMetas, writer.targetFileSize());
 
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), format, null, null);
@@ -245,18 +242,20 @@ public class KeyValueFileReadWriteTest {
                         format);
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         FileIO fileIO = FileIOFinder.find(path);
+        Options options = new Options();
+        options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
         return KeyValueFileWriterFactory.builder(
                         fileIO,
                         0,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        KEY_TYPE,
+                        DEFAULT_ROW_TYPE,
                         // normal format will buffer changes in memory and we 
can't determine
                         // if the written file size is really larger than 
suggested, so we use a
                         // special format which flushes for every added element
                         new FlushingFileFormat(format),
                         pathFactory,
                         suggestedFileSize)
-                .build(BinaryRow.EMPTY_ROW, 0, null, null, new CoreOptions(new 
Options()));
+                .build(BinaryRow.EMPTY_ROW, 0, null, null, new 
CoreOptions(options));
     }
 
     private KeyValueFileReaderFactory createReaderFactory(
@@ -269,8 +268,8 @@ public class KeyValueFileReadWriteTest {
                         fileIO,
                         createTestSchemaManager(path),
                         0,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        KEY_TYPE,
+                        DEFAULT_ROW_TYPE,
                         ignore -> new FlushingFileFormat(format),
                         pathFactory,
                         new 
TestKeyValueGenerator.TestKeyValueFieldsExtractor());
@@ -321,13 +320,10 @@ public class KeyValueFileReadWriteTest {
     }
 
     private void checkRollingFiles(
-            RowType keyType,
-            RowType valueType,
-            DataFileMeta expected,
-            List<DataFileMeta> actual,
-            long suggestedFileSize) {
-        FieldStatsArraySerializer keyStatsConverter = new 
FieldStatsArraySerializer(keyType);
-        FieldStatsArraySerializer valueStatsConverter = new 
FieldStatsArraySerializer(valueType);
+            DataFileMeta expected, List<DataFileMeta> actual, long 
suggestedFileSize) {
+        FieldStatsArraySerializer keyStatsConverter = new 
FieldStatsArraySerializer(KEY_TYPE);
+        FieldStatsArraySerializer valueStatsConverter =
+                new FieldStatsArraySerializer(DEFAULT_ROW_TYPE);
 
         // all but last file should be no smaller than suggestedFileSize
         for (int i = 0; i + 1 < actual.size(); i++) {
@@ -345,14 +341,14 @@ public class KeyValueFileReadWriteTest {
         assertThat(actual.get(actual.size() - 
1).maxKey()).isEqualTo(expected.maxKey());
 
         // check stats
-        for (int i = 0; i < keyType.getFieldCount(); i++) {
+        for (int i = 0; i < KEY_TYPE.getFieldCount(); i++) {
             int idx = i;
             StatsTestUtils.checkRollingFileStats(
                     keyStatsConverter.fromBinary(expected.keyStats())[i],
                     actual,
                     m -> keyStatsConverter.fromBinary(m.keyStats())[idx]);
         }
-        for (int i = 0; i < valueType.getFieldCount(); i++) {
+        for (int i = 0; i < DEFAULT_ROW_TYPE.getFieldCount(); i++) {
             int idx = i;
             StatsTestUtils.checkRollingFileStats(
                     valueStatsConverter.fromBinary(expected.valueStats())[i],
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index e012c7ab1..78d1cbf99 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -25,13 +25,11 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.junit.jupiter.api.io.TempDir;
@@ -41,7 +39,6 @@ import org.junit.jupiter.params.provider.EnumSource;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.FileFormatType;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -82,17 +79,14 @@ public class RollingFileWriterTest {
                                         fileFormat
                                                 .createStatsExtractor(
                                                         SCHEMA,
-                                                        IntStream.range(0, 
SCHEMA.getFieldCount())
-                                                                .mapToObj(
-                                                                        i ->
-                                                                               
 new FullFieldStatsCollector())
-                                                                .toArray(
-                                                                        
FieldStatsCollector[]::new))
+                                                        StatsCollectorFactories
+                                                                
.createFullStatsFactories(
+                                                                        
SCHEMA.getFieldCount()))
                                                 .orElse(null),
                                         0L,
                                         new LongCounter(0),
                                         
CoreOptions.FILE_COMPRESSION.defaultValue(),
-                                        
FieldStatsCollectorUtils.getFieldsStatsMode(
+                                        
StatsCollectorFactories.createStatsFactories(
                                                 new CoreOptions(new 
HashMap<>()),
                                                 SCHEMA.getFieldNames())),
                         TARGET_FILE_SIZE);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index b953f88cd..5ff3f8734 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -31,8 +31,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -137,7 +137,7 @@ public abstract class ManifestFileMetaTestBase {
                                 
CoreOptions.FILE_FORMAT.defaultValue().toString()),
                         Long.MAX_VALUE,
                         null,
-                        FieldStatsCollectorUtils.getFieldsStatsMode(
+                        StatsCollectorFactories.createStatsFactories(
                                 new CoreOptions(new HashMap<>()),
                                 getPartitionType().getFieldNames()))
                 .create();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 5989234b8..105914ebc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -28,8 +28,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.utils.FailingFileIO;
-import org.apache.paimon.utils.FieldStatsCollectorUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.StatsCollectorFactories;
 
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
@@ -111,7 +111,7 @@ public class ManifestFileTest {
                         pathFactory,
                         suggestedFileSize,
                         null,
-                        FieldStatsCollectorUtils.getFieldsStatsMode(
+                        StatsCollectorFactories.createStatsFactories(
                                 new CoreOptions(new HashMap<>()),
                                 DEFAULT_PART_TYPE.getFieldNames()))
                 .create();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 81b8ad1f7..e2ca01d92 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -89,8 +89,11 @@ public class ManifestTestDataGenerator {
                 new TableStatsCollector(
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         IntStream.range(0, 
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldCount())
-                                .mapToObj(i -> new FullFieldStatsCollector())
-                                .toArray(FieldStatsCollector[]::new));
+                                .mapToObj(
+                                        i ->
+                                                (FieldStatsCollector.Factory)
+                                                        
FullFieldStatsCollector::new)
+                                .toArray(FieldStatsCollector.Factory[]::new));
         FieldStatsArraySerializer serializer =
                 new 
FieldStatsArraySerializer(TestKeyValueGenerator.DEFAULT_PART_TYPE);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
index 3701ce166..ca4d6660c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/stats/TableStatsCollectorTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.TableStatsCollector;
+import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.IntType;
@@ -46,8 +47,11 @@ public class TableStatsCollectorTest {
                 new TableStatsCollector(
                         rowType,
                         IntStream.range(0, rowType.getFieldCount())
-                                .mapToObj(i -> new FullFieldStatsCollector())
-                                .toArray(FullFieldStatsCollector[]::new));
+                                .mapToObj(
+                                        i ->
+                                                (FieldStatsCollector.Factory)
+                                                        
FullFieldStatsCollector::new)
+                                .toArray(FieldStatsCollector.Factory[]::new));
 
         collector.collect(
                 GenericRow.of(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
index 9314ac500..530fc9b5b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/stats/TestTableStatsExtractor.java
@@ -43,10 +43,10 @@ public class TestTableStatsExtractor implements 
TableStatsExtractor {
 
     private final FileFormat format;
     private final RowType rowType;
-    private final FieldStatsCollector[] stats;
+    private final FieldStatsCollector.Factory[] stats;
 
     public TestTableStatsExtractor(
-            FileFormat format, RowType rowType, FieldStatsCollector[] stats) {
+            FileFormat format, RowType rowType, FieldStatsCollector.Factory[] 
stats) {
         this.format = format;
         this.rowType = rowType;
         this.stats = stats;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
similarity index 89%
rename from 
paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
index e69b38bae..378a8faa5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FieldStatsCollectorUtilsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/StatsCollectorFactoriesTest.java
@@ -36,8 +36,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
-/** Test for {@link FieldStatsCollectorUtils}. */
-public class FieldStatsCollectorUtilsTest {
+/** Test for {@link StatsCollectorFactories}. */
+public class StatsCollectorFactoriesTest {
     @Test
     public void testFieldStats() {
         RowType type =
@@ -52,9 +52,10 @@ public class FieldStatsCollectorUtilsTest {
                 CoreOptions.FIELDS_PREFIX + ".b." + 
CoreOptions.STATS_MODE_SUFFIX, "truncate(12)");
         options.set(CoreOptions.FIELDS_PREFIX + ".c." + 
CoreOptions.STATS_MODE_SUFFIX, "full");
 
-        FieldStatsCollector[] stats =
-                FieldStatsCollectorUtils.getFieldsStatsMode(
+        FieldStatsCollector.Factory[] statsFactories =
+                StatsCollectorFactories.createStatsFactories(
                         new CoreOptions(options), type.getFieldNames());
+        FieldStatsCollector[] stats = 
FieldStatsCollector.create(statsFactories);
         Assertions.assertEquals(3, stats.length);
         Assertions.assertEquals(16, ((TruncateFieldStatsCollector) 
stats[0]).getLength());
         Assertions.assertEquals(12, ((TruncateFieldStatsCollector) 
stats[1]).getLength());
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 8b3dc59a1..24edf5a95 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -88,7 +88,7 @@ public class OrcFileFormat extends FileFormat {
 
     @Override
     public Optional<TableStatsExtractor> createStatsExtractor(
-            RowType type, FieldStatsCollector[] statsCollectors) {
+            RowType type, FieldStatsCollector.Factory[] statsCollectors) {
         return Optional.of(new OrcTableStatsExtractor(type, statsCollectors));
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
index d394a11de..588e04f92 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/filter/OrcTableStatsExtractor.java
@@ -54,9 +54,9 @@ import java.util.stream.IntStream;
 public class OrcTableStatsExtractor implements TableStatsExtractor {
 
     private final RowType rowType;
-    private final FieldStatsCollector[] statsCollectors;
+    private final FieldStatsCollector.Factory[] statsCollectors;
 
-    public OrcTableStatsExtractor(RowType rowType, FieldStatsCollector[] 
statsCollectors) {
+    public OrcTableStatsExtractor(RowType rowType, 
FieldStatsCollector.Factory[] statsCollectors) {
         this.rowType = rowType;
         this.statsCollectors = statsCollectors;
         Preconditions.checkArgument(
@@ -74,24 +74,27 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
             List<String> columnNames = schema.getFieldNames();
             List<TypeDescription> columnTypes = schema.getChildren();
 
+            FieldStatsCollector[] collectors = 
FieldStatsCollector.create(statsCollectors);
+
             return IntStream.range(0, rowType.getFieldCount())
                     .mapToObj(
                             i -> {
                                 DataField field = rowType.getFields().get(i);
                                 int fieldIdx = 
columnNames.indexOf(field.name());
                                 int colId = columnTypes.get(fieldIdx).getId();
-                                return toFieldStats(field, 
columnStatistics[colId], rowCount, i);
+                                return toFieldStats(
+                                        field, columnStatistics[colId], 
rowCount, collectors[i]);
                             })
                     .toArray(FieldStats[]::new);
         }
     }
 
     private FieldStats toFieldStats(
-            DataField field, ColumnStatistics stats, long rowCount, int idx) {
+            DataField field, ColumnStatistics stats, long rowCount, 
FieldStatsCollector collector) {
         long nullCount = rowCount - stats.getNumberOfValues();
         if (nullCount == rowCount) {
             // all nulls
-            return this.statsCollectors[idx].convert(new FieldStats(null, 
null, nullCount));
+            return collector.convert(new FieldStats(null, null, nullCount));
         }
         Preconditions.checkState(
                 (nullCount > 0) == stats.hasNull(),
@@ -214,7 +217,7 @@ public class OrcTableStatsExtractor implements 
TableStatsExtractor {
             default:
                 fieldStats = new FieldStats(null, null, nullCount);
         }
-        return this.statsCollectors[idx].convert(fieldStats);
+        return collector.convert(fieldStats);
     }
 
     private void assertStatsClass(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index 0eb7a9be3..e39b9fe99 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -74,7 +74,7 @@ public class ParquetFileFormat extends FileFormat {
 
     @Override
     public Optional<TableStatsExtractor> createStatsExtractor(
-            RowType type, FieldStatsCollector[] statsCollectors) {
+            RowType type, FieldStatsCollector.Factory[] statsCollectors) {
         return Optional.of(new ParquetTableStatsExtractor(type, 
statsCollectors));
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
index 6bb50d851..e8836b8fe 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetTableStatsExtractor.java
@@ -45,9 +45,6 @@ import org.apache.parquet.schema.PrimitiveType;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.time.Instant;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
 import java.util.Map;
 import java.util.stream.IntStream;
 
@@ -56,12 +53,11 @@ import static 
org.apache.paimon.format.parquet.ParquetUtil.assertStatsClass;
 /** {@link TableStatsExtractor} for parquet files. */
 public class ParquetTableStatsExtractor implements TableStatsExtractor {
 
-    private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-
     private final RowType rowType;
-    private final FieldStatsCollector[] statsCollectors;
+    private final FieldStatsCollector.Factory[] statsCollectors;
 
-    public ParquetTableStatsExtractor(RowType rowType, FieldStatsCollector[] 
statsCollectors) {
+    public ParquetTableStatsExtractor(
+            RowType rowType, FieldStatsCollector.Factory[] statsCollectors) {
         this.rowType = rowType;
         this.statsCollectors = statsCollectors;
         Preconditions.checkArgument(
@@ -72,23 +68,24 @@ public class ParquetTableStatsExtractor implements 
TableStatsExtractor {
     @Override
     public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
         Map<String, Statistics<?>> stats = 
ParquetUtil.extractColumnStats(fileIO, path);
-
+        FieldStatsCollector[] collectors = 
FieldStatsCollector.create(statsCollectors);
         return IntStream.range(0, rowType.getFieldCount())
                 .mapToObj(
                         i -> {
                             DataField field = rowType.getFields().get(i);
-                            return toFieldStats(field, 
stats.get(field.name()), i);
+                            return toFieldStats(field, 
stats.get(field.name()), collectors[i]);
                         })
                 .toArray(FieldStats[]::new);
     }
 
-    private FieldStats toFieldStats(DataField field, Statistics<?> stats, int 
idx) {
+    private FieldStats toFieldStats(
+            DataField field, Statistics<?> stats, FieldStatsCollector 
collector) {
         if (stats == null) {
             return new FieldStats(null, null, null);
         }
         long nullCount = stats.getNumNulls();
         if (!stats.hasNonNullValue()) {
-            return this.statsCollectors[idx].convert(new FieldStats(null, 
null, nullCount));
+            return collector.convert(new FieldStats(null, null, nullCount));
         }
 
         FieldStats fieldStats;
@@ -170,7 +167,7 @@ public class ParquetTableStatsExtractor implements 
TableStatsExtractor {
             default:
                 fieldStats = new FieldStats(null, null, nullCount);
         }
-        return this.statsCollectors[idx].convert(fieldStats);
+        return collector.convert(fieldStats);
     }
 
     private FieldStats toTimestampStats(Statistics<?> stats, int precision) {

Reply via email to