This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 52d0aa2dd [core] Introduce stats in snapshot (#2677)
52d0aa2dd is described below

commit 52d0aa2dd0fb22835192ed148b399cfafcca2166
Author: Zouxxyy <[email protected]>
AuthorDate: Sat Jan 20 23:06:18 2024 +0800

    [core] Introduce stats in snapshot (#2677)
---
 .../org/apache/paimon/utils/OptionalUtils.java     |  28 +++
 .../java/org/apache/paimon/AbstractFileStore.java  |  13 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   3 +
 .../src/main/java/org/apache/paimon/Snapshot.java  |  35 +++-
 .../apache/paimon/operation/FileStoreCommit.java   |   7 +
 .../paimon/operation/FileStoreCommitImpl.java      |  65 +++++-
 .../java/org/apache/paimon/stats/ColStats.java     | 219 +++++++++++++++++++++
 .../main/java/org/apache/paimon/stats/Stats.java   | 202 +++++++++++++++++++
 .../java/org/apache/paimon/stats/StatsFile.java    |  75 +++++++
 .../org/apache/paimon/stats/StatsFileHandler.java  |  90 +++++++++
 .../apache/paimon/utils/FileStorePathFactory.java  |  21 ++
 .../apache/paimon/operation/FileDeletionTest.java  |   1 +
 .../paimon/operation/FileStoreCommitTest.java      |  69 +++++++
 .../org/apache/paimon/stats/StatsFileTest.java     |  69 +++++++
 .../apache/paimon/utils/SnapshotManagerTest.java   |   2 +
 15 files changed, 881 insertions(+), 18 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java
new file mode 100644
index 000000000..d45b46bbc
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.OptionalLong;
+
+/** Utils for Optional. * */
+public class OptionalUtils {
+    public static OptionalLong ofNullable(Long value) {
+        return value == null ? OptionalLong.empty() : OptionalLong.of(value);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 28672bf4f..8957b9996 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -34,6 +34,8 @@ import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFile;
+import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.TagCallback;
@@ -142,6 +144,14 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
     }
 
+    @Override
+    public StatsFileHandler newStatsFileHandler() {
+        return new StatsFileHandler(
+                snapshotManager(),
+                schemaManager,
+                new StatsFile(fileIO, pathFactory().statsFileFactory()));
+    }
+
     @Override
     public RowType partitionType() {
         return partitionType;
@@ -175,7 +185,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestFullCompactionThresholdSize(),
                 options.manifestMergeMinCount(),
                 partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite(),
-                newKeyComparator());
+                newKeyComparator(),
+                newStatsFileHandler());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 5406828d7..b8346f986 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -30,6 +30,7 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoCreation;
@@ -68,6 +69,8 @@ public interface FileStore<T> extends Serializable {
 
     IndexFileHandler newIndexFileHandler();
 
+    StatsFileHandler newStatsFileHandler();
+
     FileStoreRead<T> newRead();
 
     FileStoreWrite<T> newWrite(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 9792aca9a..f60b7d055 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -87,6 +87,7 @@ public class Snapshot {
     private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
     private static final String FIELD_CHANGELOG_RECORD_COUNT = 
"changelogRecordCount";
     private static final String FIELD_WATERMARK = "watermark";
+    private static final String FIELD_STATISTICS = "statistics";
 
     // version of snapshot
     // null for paimon <= 0.2
@@ -169,6 +170,13 @@ public class Snapshot {
     @Nullable
     private final Long watermark;
 
+    // stats file name for statistics of this table
+    // null if no stats file
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_STATISTICS)
+    @Nullable
+    private final String statistics;
+
     public Snapshot(
             long id,
             long schemaId,
@@ -184,7 +192,8 @@ public class Snapshot {
             @Nullable Long totalRecordCount,
             @Nullable Long deltaRecordCount,
             @Nullable Long changelogRecordCount,
-            @Nullable Long watermark) {
+            @Nullable Long watermark,
+            @Nullable String statistics) {
         this(
                 CURRENT_VERSION,
                 id,
@@ -201,7 +210,8 @@ public class Snapshot {
                 totalRecordCount,
                 deltaRecordCount,
                 changelogRecordCount,
-                watermark);
+                watermark,
+                statistics);
     }
 
     @JsonCreator
@@ -218,10 +228,11 @@ public class Snapshot {
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
             @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
-            @JsonProperty(FIELD_TOTAL_RECORD_COUNT) Long totalRecordCount,
-            @JsonProperty(FIELD_DELTA_RECORD_COUNT) Long deltaRecordCount,
-            @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) Long 
changelogRecordCount,
-            @JsonProperty(FIELD_WATERMARK) Long watermark) {
+            @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long 
totalRecordCount,
+            @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long 
deltaRecordCount,
+            @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long 
changelogRecordCount,
+            @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
+            @JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
         this.version = version;
         this.id = id;
         this.schemaId = schemaId;
@@ -238,6 +249,7 @@ public class Snapshot {
         this.deltaRecordCount = deltaRecordCount;
         this.changelogRecordCount = changelogRecordCount;
         this.watermark = watermark;
+        this.statistics = statistics;
     }
 
     @JsonGetter(FIELD_VERSION)
@@ -327,6 +339,12 @@ public class Snapshot {
         return watermark;
     }
 
+    @JsonGetter(FIELD_STATISTICS)
+    @Nullable
+    public String statistics() {
+        return statistics;
+    }
+
     /**
      * Return all {@link ManifestFileMeta} instances for either data or 
changelog manifests in this
      * snapshot.
@@ -487,6 +505,9 @@ public class Snapshot {
         COMPACT,
 
         /** Changes that clear up the whole partition and then add new 
records. */
-        OVERWRITE
+        OVERWRITE,
+
+        /** Collect statistics. */
+        ANALYZE
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 774c11f74..ac13663f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.metrics.CommitMetrics;
+import org.apache.paimon.stats.Stats;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.FileStorePathFactory;
 
@@ -85,6 +86,12 @@ public interface FileStoreCommit {
     /** With metrics to measure commits. */
     FileStoreCommit withMetrics(CommitMetrics metrics);
 
+    /**
+     * Commit new statistics. The {@link Snapshot.CommitKind} of generated 
snapshot is {@link
+     * Snapshot.CommitKind#ANALYZE}.
+     */
+    void commitStatistics(Stats stats, long commitIdentifier);
+
     FileStorePathFactory pathFactory();
 
     FileIO fileIO();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 494503df1..37724b35c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -40,6 +40,8 @@ import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.Stats;
+import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
@@ -116,6 +118,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private CommitMetrics commitMetrics;
 
+    private final StatsFileHandler statsFileHandler;
+
     public FileStoreCommitImpl(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -132,7 +136,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             MemorySize manifestFullCompactionSize,
             int manifestMergeMinCount,
             boolean dynamicPartitionOverwrite,
-            @Nullable Comparator<InternalRow> keyComparator) {
+            @Nullable Comparator<InternalRow> keyComparator,
+            StatsFileHandler statsFileHandler) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.commitUser = commitUser;
@@ -150,10 +155,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         this.manifestMergeMinCount = manifestMergeMinCount;
         this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
         this.keyComparator = keyComparator;
-
         this.lock = null;
         this.ignoreEmptyCommit = true;
         this.commitMetrics = null;
+        this.statsFileHandler = statsFileHandler;
     }
 
     @Override
@@ -248,7 +253,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.APPEND,
-                                safeLatestSnapshotId);
+                                safeLatestSnapshotId,
+                                null);
                 generatedSnapshot += 1;
             }
 
@@ -276,7 +282,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
-                                safeLatestSnapshotId);
+                                safeLatestSnapshotId,
+                                null);
                 generatedSnapshot += 1;
             }
         } finally {
@@ -421,6 +428,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 Snapshot.CommitKind.COMPACT,
+                                null,
                                 null);
                 generatedSnapshot += 1;
             }
@@ -504,6 +512,21 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return this;
     }
 
+    @Override
+    public void commitStatistics(Stats stats, long commitIdentifier) {
+        String statsFileName = statsFileHandler.writeStats(stats);
+        tryCommit(
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                commitIdentifier,
+                null,
+                Collections.emptyMap(),
+                Snapshot.CommitKind.ANALYZE,
+                null,
+                statsFileName);
+    }
+
     @Override
     public FileStorePathFactory pathFactory() {
         return pathFactory;
@@ -573,7 +596,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
-            Long safeLatestSnapshotId) {
+            @Nullable Long safeLatestSnapshotId,
+            @Nullable String statsFileName) {
         int cnt = 0;
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
@@ -587,7 +611,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     logOffsets,
                     commitKind,
                     latestSnapshot,
-                    safeLatestSnapshotId)) {
+                    safeLatestSnapshotId,
+                    statsFileName)) {
                 break;
             }
         }
@@ -650,6 +675,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     logOffsets,
                     Snapshot.CommitKind.OVERWRITE,
                     latestSnapshot,
+                    null,
                     null)) {
                 break;
             }
@@ -666,8 +692,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
-            Snapshot latestSnapshot,
-            Long safeLatestSnapshotId) {
+            @Nullable Snapshot latestSnapshot,
+            @Nullable Long safeLatestSnapshotId,
+            @Nullable String newStatsFileName) {
         long newSnapshotId =
                 latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : 
latestSnapshot.id() + 1;
         Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
@@ -752,11 +779,28 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 newIndexManifest = indexManifest;
             }
 
+            long latestSchemaId = schemaManager.latest().get().id();
+
+            // write new stats or inherit from the previous snapshot
+            String statsFileName = null;
+            if (newStatsFileName != null) {
+                statsFileName = newStatsFileName;
+            } else if (latestSnapshot != null) {
+                Optional<Stats> previousStatistic = 
statsFileHandler.readStats(latestSnapshot);
+                if (previousStatistic.isPresent()) {
+                    if (previousStatistic.get().schemaId() != latestSchemaId) {
+                        LOG.warn("Schema changed, stats will not be 
inherited");
+                    } else {
+                        statsFileName = latestSnapshot.statistics();
+                    }
+                }
+            }
+
             // prepare snapshot file
             newSnapshot =
                     new Snapshot(
                             newSnapshotId,
-                            schemaManager.latest().get().id(),
+                            latestSchemaId,
                             previousChangesListName,
                             newChangesListName,
                             changelogListName,
@@ -769,7 +813,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             totalRecordCount,
                             deltaRecordCount,
                             Snapshot.recordCount(changelogFiles),
-                            currentWatermark);
+                            currentWatermark,
+                            statsFileName);
         } catch (Throwable e) {
             // fails when preparing for commit, we should clean up
             cleanUpTmpManifests(
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java
new file mode 100644
index 000000000..81a7e0a38
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.OptionalUtils;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * Col stats, supports the following stats.
+ *
+ * <ul>
+ *   <li>distinctCount: the number of distinct values
+ *   <li>min: the minimum value of the column
+ *   <li>max: the maximum value of the column
+ *   <li>nullCount: the number of nulls
+ *   <li>avgLen: average column length
+ *   <li>maxLen: max column length
+ * </ul>
+ *
+ * @param <T> col internal data type
+ */
+public class ColStats<T> {
+
+    private static final String FIELD_COL_ID = "colId";
+    private static final String FIELD_DISTINCT_COUNT = "distinctCount";
+    private static final String FIELD_MIN = "min";
+    private static final String FIELD_MAX = "max";
+    private static final String FIELD_NULL_COUNT = "nullCount";
+    private static final String FIELD_AVG_LEN = "avgLen";
+    private static final String FIELD_MAX_LEN = "maxLen";
+
+    @JsonProperty(FIELD_COL_ID)
+    private final int colId;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_DISTINCT_COUNT)
+    private final @Nullable Long distinctCount;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_MIN)
+    private @Nullable String serializedMin;
+
+    private @Nullable Comparable<T> min;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_MAX)
+    private @Nullable String serializedMax;
+
+    private @Nullable Comparable<T> max;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_NULL_COUNT)
+    private final @Nullable Long nullCount;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_AVG_LEN)
+    private final @Nullable Long avgLen;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_MAX_LEN)
+    private final @Nullable Long maxLen;
+
+    @JsonCreator
+    public ColStats(
+            @JsonProperty(FIELD_COL_ID) int colId,
+            @JsonProperty(FIELD_DISTINCT_COUNT) @Nullable Long distinctCount,
+            @JsonProperty(FIELD_MIN) @Nullable String serializedMin,
+            @JsonProperty(FIELD_MAX) @Nullable String serializedMax,
+            @JsonProperty(FIELD_NULL_COUNT) @Nullable Long nullCount,
+            @JsonProperty(FIELD_AVG_LEN) @Nullable Long avgLen,
+            @JsonProperty(FIELD_MAX_LEN) @Nullable Long maxLen) {
+        this.colId = colId;
+        this.distinctCount = distinctCount;
+        this.serializedMin = serializedMin;
+        this.serializedMax = serializedMax;
+        this.nullCount = nullCount;
+        this.avgLen = avgLen;
+        this.maxLen = maxLen;
+    }
+
+    public ColStats(
+            int colId,
+            @Nullable Long distinctCount,
+            @Nullable Comparable<T> min,
+            @Nullable Comparable<T> max,
+            @Nullable Long nullCount,
+            @Nullable Long avgLen,
+            @Nullable Long maxLen) {
+        this.colId = colId;
+        this.distinctCount = distinctCount;
+        this.min = min;
+        this.max = max;
+        this.nullCount = nullCount;
+        this.avgLen = avgLen;
+        this.maxLen = maxLen;
+    }
+
+    public int colId() {
+        return colId;
+    }
+
+    public OptionalLong distinctCount() {
+        return OptionalUtils.ofNullable(distinctCount);
+    }
+
+    public Optional<Comparable<T>> min() {
+        return Optional.ofNullable(min);
+    }
+
+    public Optional<Comparable<T>> max() {
+        return Optional.ofNullable(max);
+    }
+
+    public OptionalLong nullCount() {
+        return OptionalUtils.ofNullable(nullCount);
+    }
+
+    public OptionalLong avgLen() {
+        return OptionalUtils.ofNullable(avgLen);
+    }
+
+    public OptionalLong maxLen() {
+        return OptionalUtils.ofNullable(maxLen);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void serializeFieldsToString(DataType dataType) {
+        if ((min != null && serializedMin == null) || (max != null && 
serializedMax == null)) {
+            Serializer<T> serializer = InternalSerializers.create(dataType);
+            if (min != null && serializedMin == null) {
+                serializedMin = serializer.serializeToString((T) min);
+            }
+            if (max != null && serializedMax == null) {
+                serializedMax = serializer.serializeToString((T) max);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void deserializeFieldsFromString(DataType dataType) {
+        if ((serializedMin != null && min == null) || (serializedMax != null 
&& max == null)) {
+            Serializer<T> serializer = InternalSerializers.create(dataType);
+            if (serializedMin != null && min == null) {
+                min = (Comparable<T>) 
serializer.deserializeFromString(serializedMin);
+            }
+            if (serializedMax != null && max == null) {
+                max = (Comparable<T>) 
serializer.deserializeFromString(serializedMax);
+            }
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ColStats<?> colStats = (ColStats<?>) o;
+        return colId == colStats.colId
+                && Objects.equals(distinctCount, colStats.distinctCount)
+                && Objects.equals(serializedMin, colStats.serializedMin)
+                && Objects.equals(min, colStats.min)
+                && Objects.equals(serializedMax, colStats.serializedMax)
+                && Objects.equals(max, colStats.max)
+                && Objects.equals(nullCount, colStats.nullCount)
+                && Objects.equals(avgLen, colStats.avgLen)
+                && Objects.equals(maxLen, colStats.maxLen);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                colId,
+                distinctCount,
+                serializedMin,
+                min,
+                serializedMax,
+                max,
+                nullCount,
+                avgLen,
+                maxLen);
+    }
+
+    @Override
+    public String toString() {
+        return JsonSerdeUtil.toJson(this);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java
new file mode 100644
index 000000000..046b66a51
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.OptionalUtils;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * Global stats, supports the following stats.
+ *
+ * <ul>
+ *   <li>mergedRecordCount: the total number of records after merge
+ *   <li>mergedRecordSize: the size of the mergedRecordCount in bytes
+ *   <li>colStats: column stats map
+ * </ul>
+ */
+public class Stats {
+
+    // ID of the snapshot this statistics collected from
+    private static final String FIELD_SNAPSHOT_ID = "snapshotId";
+    // Schema ID of the snapshot this statistics collected from
+    private static final String FIELD_SCHEMA_ID = "schemaId";
+    private static final String FIELD_MERGED_RECORD_COUNT = 
"mergedRecordCount";
+    private static final String FIELD_MERGED_RECORD_SIZE = "mergedRecordSize";
+    private static final String FIELD_COL_STATS = "colStats";
+
+    @JsonProperty(FIELD_SNAPSHOT_ID)
+    private final long snapshotId;
+
+    @JsonProperty(FIELD_SCHEMA_ID)
+    private final long schemaId;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_MERGED_RECORD_COUNT)
+    private final @Nullable Long mergedRecordCount;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_MERGED_RECORD_SIZE)
+    private final @Nullable Long mergedRecordSize;
+
+    @JsonProperty(FIELD_COL_STATS)
+    private final Map<String, ColStats<?>> colStats;
+
+    @JsonCreator
+    public Stats(
+            @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+            @JsonProperty(FIELD_SCHEMA_ID) long schemaId,
+            @JsonProperty(FIELD_MERGED_RECORD_COUNT) @Nullable Long 
mergedRecordCount,
+            @JsonProperty(FIELD_MERGED_RECORD_SIZE) @Nullable Long 
mergedRecordSize,
+            @JsonProperty(FIELD_COL_STATS) Map<String, ColStats<?>> colStats) {
+        this.snapshotId = snapshotId;
+        this.schemaId = schemaId;
+        this.mergedRecordCount = mergedRecordCount;
+        this.mergedRecordSize = mergedRecordSize;
+        this.colStats = colStats;
+    }
+
+    public Stats(long snapshotId, long schemaId, Long mergedRecordCount, Long 
mergedRecordSize) {
+        this(snapshotId, schemaId, mergedRecordCount, mergedRecordSize, 
Collections.emptyMap());
+    }
+
+    public long snapshotId() {
+        return snapshotId;
+    }
+
+    public long schemaId() {
+        return schemaId;
+    }
+
+    public OptionalLong mergedRecordCount() {
+        return OptionalUtils.ofNullable(mergedRecordCount);
+    }
+
+    public OptionalLong mergedRecordSize() {
+        return OptionalUtils.ofNullable(mergedRecordSize);
+    }
+
+    public Map<String, ColStats<?>> colStats() {
+        return colStats;
+    }
+
+    public void serializeFieldsToString(TableSchema schema) {
+        try {
+            if (colStats != null) {
+                for (Map.Entry<String, ColStats<?>> entry : 
colStats.entrySet()) {
+                    String colName = entry.getKey();
+                    ColStats<?> colStats = entry.getValue();
+                    DataType type =
+                            schema.fields().stream()
+                                    .filter(field -> 
field.name().equals(colName))
+                                    .findFirst()
+                                    .orElseThrow(
+                                            () ->
+                                                    new IllegalStateException(
+                                                            "Unable to obtain 
the latest schema"))
+                                    .type();
+                    colStats.serializeFieldsToString(type);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to serialize fields to string", 
e);
+        }
+    }
+
+    public void deserializeFieldsFromString(TableSchema schema) {
+        try {
+            if (colStats != null) {
+                for (Map.Entry<String, ColStats<?>> entry : 
colStats.entrySet()) {
+                    String colName = entry.getKey();
+                    ColStats<?> colStats = entry.getValue();
+                    DataType type =
+                            schema.fields().stream()
+                                    .filter(field -> 
field.name().equals(colName))
+                                    .findFirst()
+                                    .orElseThrow(
+                                            () ->
+                                                    new IllegalStateException(
+                                                            "Unable to obtain 
the latest schema"))
+                                    .type();
+                    colStats.deserializeFieldsFromString(type);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to deserialize fields from 
string", e);
+        }
+    }
+
+    public String toJson() {
+        return JsonSerdeUtil.toJson(this);
+    }
+
+    public static Stats fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, Stats.class);
+    }
+
+    public static Stats fromPath(FileIO fileIO, Path path) {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return Stats.fromJson(json);
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Stats stats = (Stats) o;
+        return snapshotId == stats.snapshotId
+                && schemaId == stats.schemaId
+                && Objects.equals(mergedRecordCount, stats.mergedRecordCount)
+                && Objects.equals(mergedRecordSize, stats.mergedRecordSize)
+                && Objects.equals(colStats, stats.colStats);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(snapshotId, schemaId, mergedRecordCount, 
mergedRecordSize, colStats);
+    }
+
+    @Override
+    public String toString() {
+        return JsonSerdeUtil.toJson(this);
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java
new file mode 100644
index 000000000..252f785bb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/** Stats file contains stats. */
+public class StatsFile {
+
+    private final FileIO fileIO;
+    private final PathFactory pathFactory;
+
+    public StatsFile(FileIO fileIO, PathFactory pathFactory) {
+        this.fileIO = fileIO;
+        this.pathFactory = pathFactory;
+    }
+
+    /**
+     * Read stats from stat file name.
+     *
+     * @return stats
+     */
+    public Stats read(String fileName) {
+        return Stats.fromPath(fileIO, pathFactory.toPath(fileName));
+    }
+
+    /**
+     * Write stats to a stats file.
+     *
+     * @return the written file name
+     */
+    public String write(Stats stats) {
+        Path path = pathFactory.newPath();
+
+        try {
+            fileIO.writeFileUtf8(path, stats.toJson());
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write stats file: " + path, 
e);
+        }
+        return path.getName();
+    }
+
+    public void delete(String fileName) {
+        fileIO.deleteQuietly(pathFactory.toPath(fileName));
+    }
+
+    public boolean exists(String fileName) {
+        try {
+            return fileIO.exists(pathFactory.toPath(fileName));
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
new file mode 100644
index 000000000..73ed6c5d4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.util.Optional;
+
+/** Handler of StatsFile. */
+public class StatsFileHandler {
+
+    private final SnapshotManager snapshotManager;
+    private final SchemaManager schemaManager;
+    private final StatsFile statsFile;
+
+    public StatsFileHandler(
+            SnapshotManager snapshotManager, SchemaManager schemaManager, 
StatsFile statsFile) {
+        this.snapshotManager = snapshotManager;
+        this.schemaManager = schemaManager;
+        this.statsFile = statsFile;
+    }
+
+    /**
+     * Write stats to a stats file.
+     *
+     * @return the written file name
+     */
+    public String writeStats(Stats stats) {
+        stats.serializeFieldsToString(schemaManager.schema(stats.schemaId()));
+        return statsFile.write(stats);
+    }
+
+    /**
+     * Read stats of the latest snapshot.
+     *
+     * @return stats
+     */
+    public Optional<Stats> readStats() {
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        if (latestSnapshotId == null) {
+            throw new IllegalStateException("Unable to obtain the latest 
schema");
+        }
+        return readStats(latestSnapshotId);
+    }
+
+    /**
+     * Read stats of the specified snapshot.
+     *
+     * @return stats
+     */
+    public Optional<Stats> readStats(long snapshotId) {
+        return readStats(snapshotManager.snapshot(snapshotId));
+    }
+
+    public Optional<Stats> readStats(Snapshot snapshot) {
+        if (snapshot.statistics() == null) {
+            return Optional.empty();
+        } else {
+            Stats stats = statsFile.read(snapshot.statistics());
+            
stats.deserializeFieldsFromString(schemaManager.schema(stats.schemaId()));
+            return Optional.of(stats);
+        }
+    }
+
+    /** Delete stats of the specified snapshot. */
+    public void deleteStats(long snapshotId) {
+        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+        if (snapshot.statistics() != null) {
+            statsFile.delete(snapshot.statistics());
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 17d167d72..f9aaaf67b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -56,6 +56,7 @@ public class FileStorePathFactory {
     private final AtomicInteger manifestListCount;
     private final AtomicInteger indexManifestCount;
     private final AtomicInteger indexFileCount;
+    private final AtomicInteger statsFileCount;
 
     public FileStorePathFactory(Path root) {
         this(
@@ -78,6 +79,7 @@ public class FileStorePathFactory {
         this.manifestListCount = new AtomicInteger(0);
         this.indexManifestCount = new AtomicInteger(0);
         this.indexFileCount = new AtomicInteger(0);
+        this.statsFileCount = new AtomicInteger(0);
     }
 
     public Path root() {
@@ -207,4 +209,23 @@ public class FileStorePathFactory {
             }
         };
     }
+
+    public PathFactory statsFileFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return new Path(
+                        root
+                                + "/statistics/stats-"
+                                + uuid
+                                + "-"
+                                + statsFileCount.getAndIncrement());
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return new Path(root + "/statistics/" + fileName);
+            }
+        };
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 03626a6b6..c41741c08 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -722,6 +722,7 @@ public class FileDeletionTest {
                         Collections.emptyMap(),
                         Snapshot.CommitKind.APPEND,
                         store.snapshotManager().latestSnapshot(),
+                        null,
                         null);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 10cde724a..58d961b49 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -35,8 +35,14 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.stats.ColStats;
+import org.apache.paimon.stats.Stats;
+import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FailingFileIO;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
@@ -774,6 +780,69 @@ public class FileStoreCommitTest {
         assertThat(file).isEmpty();
     }
 
+    @Test
+    public void testWriteStats() throws Exception {
+        TestFileStore store = createStore(false, 1, 
CoreOptions.ChangelogProducer.NONE);
+        StatsFileHandler statsFileHandler = store.newStatsFileHandler();
+        FileStoreCommitImpl fileStoreCommit = store.newCommit();
+        store.commitData(generateDataList(10), gen::getPartition, kv -> 0, 
Collections.emptyMap());
+        Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+
+        // Analyze and check
+        HashMap<String, ColStats<?>> fakeColStatsMap = new HashMap<>();
+        fakeColStatsMap.put("orderId", new ColStats<>(3, 10L, 1L, 10L, 0L, 8L, 
8L));
+        Stats fakeStats =
+                new Stats(
+                        latestSnapshot.id(),
+                        latestSnapshot.schemaId(),
+                        10L,
+                        1000L,
+                        fakeColStatsMap);
+        fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+        Optional<Stats> readStats = statsFileHandler.readStats();
+        assertThat(readStats).isPresent();
+        assertThat(readStats.get()).isEqualTo(fakeStats);
+
+        // New snapshot will inherit last snapshot's stats
+        store.commitData(generateDataList(10), gen::getPartition, kv -> 0, 
Collections.emptyMap());
+        readStats = statsFileHandler.readStats();
+        assertThat(readStats).isPresent();
+        assertThat(readStats.get()).isEqualTo(fakeStats);
+
+        // When table schema is modified, new snapshot will not inherit last 
snapshot's stats
+        ArrayList<DataField> newFields =
+                new 
ArrayList<>(TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields());
+        newFields.add(new DataField(-1, "newField", DataTypes.INT()));
+        store.mergeSchema(new RowType(false, newFields), true);
+        store.commitData(generateDataList(10), gen::getPartition, kv -> 0, 
Collections.emptyMap());
+        readStats = statsFileHandler.readStats();
+        assertThat(readStats).isEmpty();
+
+        // Then we need to analyze again
+        latestSnapshot = store.snapshotManager().latestSnapshot();
+        fakeColStatsMap = new HashMap<>();
+        fakeColStatsMap.put("orderId", new ColStats<>(3, 30L, 1L, 30L, 0L, 8L, 
8L));
+        fakeStats =
+                new Stats(
+                        latestSnapshot.id(),
+                        latestSnapshot.schemaId(),
+                        30L,
+                        3000L,
+                        fakeColStatsMap);
+        fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+        readStats = statsFileHandler.readStats();
+        assertThat(readStats).isPresent();
+        assertThat(readStats.get()).isEqualTo(fakeStats);
+
+        // Analyze without col stats and check
+        latestSnapshot = store.snapshotManager().latestSnapshot();
+        fakeStats = new Stats(latestSnapshot.id(), latestSnapshot.schemaId(), 
30L, 3000L);
+        fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+        readStats = statsFileHandler.readStats();
+        assertThat(readStats).isPresent();
+        assertThat(readStats.get()).isEqualTo(fakeStats);
+    }
+
     private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java
new file mode 100644
index 000000000..53ea463a1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StatsFile}. */
+public class StatsFileTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    @Test
+    public void test() throws IOException {
+        Path dir = new Path(tempPath.toUri());
+        PathFactory pathFactory =
+                new PathFactory() {
+                    @Override
+                    public Path newPath() {
+                        return new Path(dir, UUID.randomUUID().toString());
+                    }
+
+                    @Override
+                    public Path toPath(String fileName) {
+                        return new Path(dir, fileName);
+                    }
+                };
+
+        StatsFile file = new StatsFile(LocalFileIO.create(), pathFactory);
+        HashMap<String, ColStats<?>> colStatsMap = new HashMap<>();
+        colStatsMap.put("orderId", new ColStats<>(0, 10L, "111", "222", 0L, 
8L, 8L));
+        Stats stats = new Stats(1L, 0L, 10L, 1000L, colStatsMap);
+        String fileName = file.write(stats);
+
+        assertThat(file.exists(fileName)).isTrue();
+
+        assertThat(file.read(fileName)).isEqualTo(stats);
+
+        file.delete(fileName);
+
+        assertThat(file.exists(fileName)).isFalse();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 5e689a826..9778ba3be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -73,6 +73,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             null);
             localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }
@@ -110,6 +111,7 @@ public class SnapshotManagerTest {
                             null,
                             null,
                             null,
+                            null,
                             null);
             localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }

Reply via email to