This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 52d0aa2dd [core] Introduce stats in snapshot (#2677)
52d0aa2dd is described below
commit 52d0aa2dd0fb22835192ed148b399cfafcca2166
Author: Zouxxyy <[email protected]>
AuthorDate: Sat Jan 20 23:06:18 2024 +0800
[core] Introduce stats in snapshot (#2677)
---
.../org/apache/paimon/utils/OptionalUtils.java | 28 +++
.../java/org/apache/paimon/AbstractFileStore.java | 13 +-
.../src/main/java/org/apache/paimon/FileStore.java | 3 +
.../src/main/java/org/apache/paimon/Snapshot.java | 35 +++-
.../apache/paimon/operation/FileStoreCommit.java | 7 +
.../paimon/operation/FileStoreCommitImpl.java | 65 +++++-
.../java/org/apache/paimon/stats/ColStats.java | 219 +++++++++++++++++++++
.../main/java/org/apache/paimon/stats/Stats.java | 202 +++++++++++++++++++
.../java/org/apache/paimon/stats/StatsFile.java | 75 +++++++
.../org/apache/paimon/stats/StatsFileHandler.java | 90 +++++++++
.../apache/paimon/utils/FileStorePathFactory.java | 21 ++
.../apache/paimon/operation/FileDeletionTest.java | 1 +
.../paimon/operation/FileStoreCommitTest.java | 69 +++++++
.../org/apache/paimon/stats/StatsFileTest.java | 69 +++++++
.../apache/paimon/utils/SnapshotManagerTest.java | 2 +
15 files changed, 881 insertions(+), 18 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java
new file mode 100644
index 000000000..d45b46bbc
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/OptionalUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.OptionalLong;
+
+/** Utils for Optional. * */
+public class OptionalUtils {
+ public static OptionalLong ofNullable(Long value) {
+ return value == null ? OptionalLong.empty() : OptionalLong.of(value);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 28672bf4f..8957b9996 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -34,6 +34,8 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFile;
+import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
@@ -142,6 +144,14 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
}
+ @Override
+ public StatsFileHandler newStatsFileHandler() {
+ return new StatsFileHandler(
+ snapshotManager(),
+ schemaManager,
+ new StatsFile(fileIO, pathFactory().statsFileFactory()));
+ }
+
@Override
public RowType partitionType() {
return partitionType;
@@ -175,7 +185,8 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestFullCompactionThresholdSize(),
options.manifestMergeMinCount(),
partitionType.getFieldCount() > 0 &&
options.dynamicPartitionOverwrite(),
- newKeyComparator());
+ newKeyComparator(),
+ newStatsFileHandler());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 5406828d7..b8346f986 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -30,6 +30,7 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
@@ -68,6 +69,8 @@ public interface FileStore<T> extends Serializable {
IndexFileHandler newIndexFileHandler();
+ StatsFileHandler newStatsFileHandler();
+
FileStoreRead<T> newRead();
FileStoreWrite<T> newWrite(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 9792aca9a..f60b7d055 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -87,6 +87,7 @@ public class Snapshot {
private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
private static final String FIELD_CHANGELOG_RECORD_COUNT =
"changelogRecordCount";
private static final String FIELD_WATERMARK = "watermark";
+ private static final String FIELD_STATISTICS = "statistics";
// version of snapshot
// null for paimon <= 0.2
@@ -169,6 +170,13 @@ public class Snapshot {
@Nullable
private final Long watermark;
+ // stats file name for statistics of this table
+ // null if no stats file
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_STATISTICS)
+ @Nullable
+ private final String statistics;
+
public Snapshot(
long id,
long schemaId,
@@ -184,7 +192,8 @@ public class Snapshot {
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
- @Nullable Long watermark) {
+ @Nullable Long watermark,
+ @Nullable String statistics) {
this(
CURRENT_VERSION,
id,
@@ -201,7 +210,8 @@ public class Snapshot {
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
- watermark);
+ watermark,
+ statistics);
}
@JsonCreator
@@ -218,10 +228,11 @@ public class Snapshot {
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
- @JsonProperty(FIELD_TOTAL_RECORD_COUNT) Long totalRecordCount,
- @JsonProperty(FIELD_DELTA_RECORD_COUNT) Long deltaRecordCount,
- @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) Long
changelogRecordCount,
- @JsonProperty(FIELD_WATERMARK) Long watermark) {
+ @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long
totalRecordCount,
+ @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long
deltaRecordCount,
+ @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long
changelogRecordCount,
+ @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
+ @JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
@@ -238,6 +249,7 @@ public class Snapshot {
this.deltaRecordCount = deltaRecordCount;
this.changelogRecordCount = changelogRecordCount;
this.watermark = watermark;
+ this.statistics = statistics;
}
@JsonGetter(FIELD_VERSION)
@@ -327,6 +339,12 @@ public class Snapshot {
return watermark;
}
+ @JsonGetter(FIELD_STATISTICS)
+ @Nullable
+ public String statistics() {
+ return statistics;
+ }
+
/**
* Return all {@link ManifestFileMeta} instances for either data or
changelog manifests in this
* snapshot.
@@ -487,6 +505,9 @@ public class Snapshot {
COMPACT,
/** Changes that clear up the whole partition and then add new
records. */
- OVERWRITE
+ OVERWRITE,
+
+ /** Collect statistics. */
+ ANALYZE
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 774c11f74..ac13663f7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.metrics.CommitMetrics;
+import org.apache.paimon.stats.Stats;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -85,6 +86,12 @@ public interface FileStoreCommit {
/** With metrics to measure commits. */
FileStoreCommit withMetrics(CommitMetrics metrics);
+ /**
+ * Commit new statistics. The {@link Snapshot.CommitKind} of generated
snapshot is {@link
+ * Snapshot.CommitKind#ANALYZE}.
+ */
+ void commitStatistics(Stats stats, long commitIdentifier);
+
FileStorePathFactory pathFactory();
FileIO fileIO();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 494503df1..37724b35c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -40,6 +40,8 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.stats.Stats;
+import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
@@ -116,6 +118,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private CommitMetrics commitMetrics;
+ private final StatsFileHandler statsFileHandler;
+
public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
@@ -132,7 +136,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
MemorySize manifestFullCompactionSize,
int manifestMergeMinCount,
boolean dynamicPartitionOverwrite,
- @Nullable Comparator<InternalRow> keyComparator) {
+ @Nullable Comparator<InternalRow> keyComparator,
+ StatsFileHandler statsFileHandler) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
@@ -150,10 +155,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
this.manifestMergeMinCount = manifestMergeMinCount;
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
-
this.lock = null;
this.ignoreEmptyCommit = true;
this.commitMetrics = null;
+ this.statsFileHandler = statsFileHandler;
}
@Override
@@ -248,7 +253,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.APPEND,
- safeLatestSnapshotId);
+ safeLatestSnapshotId,
+ null);
generatedSnapshot += 1;
}
@@ -276,7 +282,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
- safeLatestSnapshotId);
+ safeLatestSnapshotId,
+ null);
generatedSnapshot += 1;
}
} finally {
@@ -421,6 +428,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
Snapshot.CommitKind.COMPACT,
+ null,
null);
generatedSnapshot += 1;
}
@@ -504,6 +512,21 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return this;
}
+ @Override
+ public void commitStatistics(Stats stats, long commitIdentifier) {
+ String statsFileName = statsFileHandler.writeStats(stats);
+ tryCommit(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ commitIdentifier,
+ null,
+ Collections.emptyMap(),
+ Snapshot.CommitKind.ANALYZE,
+ null,
+ statsFileName);
+ }
+
@Override
public FileStorePathFactory pathFactory() {
return pathFactory;
@@ -573,7 +596,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
- Long safeLatestSnapshotId) {
+ @Nullable Long safeLatestSnapshotId,
+ @Nullable String statsFileName) {
int cnt = 0;
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
@@ -587,7 +611,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
logOffsets,
commitKind,
latestSnapshot,
- safeLatestSnapshotId)) {
+ safeLatestSnapshotId,
+ statsFileName)) {
break;
}
}
@@ -650,6 +675,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
logOffsets,
Snapshot.CommitKind.OVERWRITE,
latestSnapshot,
+ null,
null)) {
break;
}
@@ -666,8 +692,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Snapshot.CommitKind commitKind,
- Snapshot latestSnapshot,
- Long safeLatestSnapshotId) {
+ @Nullable Snapshot latestSnapshot,
+ @Nullable Long safeLatestSnapshotId,
+ @Nullable String newStatsFileName) {
long newSnapshotId =
latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID :
latestSnapshot.id() + 1;
Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId);
@@ -752,11 +779,28 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
newIndexManifest = indexManifest;
}
+ long latestSchemaId = schemaManager.latest().get().id();
+
+ // write new stats or inherit from the previous snapshot
+ String statsFileName = null;
+ if (newStatsFileName != null) {
+ statsFileName = newStatsFileName;
+ } else if (latestSnapshot != null) {
+ Optional<Stats> previousStatistic =
statsFileHandler.readStats(latestSnapshot);
+ if (previousStatistic.isPresent()) {
+ if (previousStatistic.get().schemaId() != latestSchemaId) {
+ LOG.warn("Schema changed, stats will not be
inherited");
+ } else {
+ statsFileName = latestSnapshot.statistics();
+ }
+ }
+ }
+
// prepare snapshot file
newSnapshot =
new Snapshot(
newSnapshotId,
- schemaManager.latest().get().id(),
+ latestSchemaId,
previousChangesListName,
newChangesListName,
changelogListName,
@@ -769,7 +813,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
- currentWatermark);
+ currentWatermark,
+ statsFileName);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
cleanUpTmpManifests(
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java
b/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java
new file mode 100644
index 000000000..81a7e0a38
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/ColStats.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.OptionalUtils;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * Col stats, supports the following stats.
+ *
+ * <ul>
+ * <li>distinctCount: the number of distinct values
+ * <li>min: the minimum value of the column
+ * <li>max: the maximum value of the column
+ * <li>nullCount: the number of nulls
+ * <li>avgLen: average column length
+ * <li>maxLen: max column length
+ * </ul>
+ *
+ * @param <T> col internal data type
+ */
+public class ColStats<T> {
+
+ private static final String FIELD_COL_ID = "colId";
+ private static final String FIELD_DISTINCT_COUNT = "distinctCount";
+ private static final String FIELD_MIN = "min";
+ private static final String FIELD_MAX = "max";
+ private static final String FIELD_NULL_COUNT = "nullCount";
+ private static final String FIELD_AVG_LEN = "avgLen";
+ private static final String FIELD_MAX_LEN = "maxLen";
+
+ @JsonProperty(FIELD_COL_ID)
+ private final int colId;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_DISTINCT_COUNT)
+ private final @Nullable Long distinctCount;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_MIN)
+ private @Nullable String serializedMin;
+
+ private @Nullable Comparable<T> min;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_MAX)
+ private @Nullable String serializedMax;
+
+ private @Nullable Comparable<T> max;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_NULL_COUNT)
+ private final @Nullable Long nullCount;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_AVG_LEN)
+ private final @Nullable Long avgLen;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_MAX_LEN)
+ private final @Nullable Long maxLen;
+
+ @JsonCreator
+ public ColStats(
+ @JsonProperty(FIELD_COL_ID) int colId,
+ @JsonProperty(FIELD_DISTINCT_COUNT) @Nullable Long distinctCount,
+ @JsonProperty(FIELD_MIN) @Nullable String serializedMin,
+ @JsonProperty(FIELD_MAX) @Nullable String serializedMax,
+ @JsonProperty(FIELD_NULL_COUNT) @Nullable Long nullCount,
+ @JsonProperty(FIELD_AVG_LEN) @Nullable Long avgLen,
+ @JsonProperty(FIELD_MAX_LEN) @Nullable Long maxLen) {
+ this.colId = colId;
+ this.distinctCount = distinctCount;
+ this.serializedMin = serializedMin;
+ this.serializedMax = serializedMax;
+ this.nullCount = nullCount;
+ this.avgLen = avgLen;
+ this.maxLen = maxLen;
+ }
+
+ public ColStats(
+ int colId,
+ @Nullable Long distinctCount,
+ @Nullable Comparable<T> min,
+ @Nullable Comparable<T> max,
+ @Nullable Long nullCount,
+ @Nullable Long avgLen,
+ @Nullable Long maxLen) {
+ this.colId = colId;
+ this.distinctCount = distinctCount;
+ this.min = min;
+ this.max = max;
+ this.nullCount = nullCount;
+ this.avgLen = avgLen;
+ this.maxLen = maxLen;
+ }
+
+ public int colId() {
+ return colId;
+ }
+
+ public OptionalLong distinctCount() {
+ return OptionalUtils.ofNullable(distinctCount);
+ }
+
+ public Optional<Comparable<T>> min() {
+ return Optional.ofNullable(min);
+ }
+
+ public Optional<Comparable<T>> max() {
+ return Optional.ofNullable(max);
+ }
+
+ public OptionalLong nullCount() {
+ return OptionalUtils.ofNullable(nullCount);
+ }
+
+ public OptionalLong avgLen() {
+ return OptionalUtils.ofNullable(avgLen);
+ }
+
+ public OptionalLong maxLen() {
+ return OptionalUtils.ofNullable(maxLen);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void serializeFieldsToString(DataType dataType) {
+ if ((min != null && serializedMin == null) || (max != null &&
serializedMax == null)) {
+ Serializer<T> serializer = InternalSerializers.create(dataType);
+ if (min != null && serializedMin == null) {
+ serializedMin = serializer.serializeToString((T) min);
+ }
+ if (max != null && serializedMax == null) {
+ serializedMax = serializer.serializeToString((T) max);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void deserializeFieldsFromString(DataType dataType) {
+ if ((serializedMin != null && min == null) || (serializedMax != null
&& max == null)) {
+ Serializer<T> serializer = InternalSerializers.create(dataType);
+ if (serializedMin != null && min == null) {
+ min = (Comparable<T>)
serializer.deserializeFromString(serializedMin);
+ }
+ if (serializedMax != null && max == null) {
+ max = (Comparable<T>)
serializer.deserializeFromString(serializedMax);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColStats<?> colStats = (ColStats<?>) o;
+ return colId == colStats.colId
+ && Objects.equals(distinctCount, colStats.distinctCount)
+ && Objects.equals(serializedMin, colStats.serializedMin)
+ && Objects.equals(min, colStats.min)
+ && Objects.equals(serializedMax, colStats.serializedMax)
+ && Objects.equals(max, colStats.max)
+ && Objects.equals(nullCount, colStats.nullCount)
+ && Objects.equals(avgLen, colStats.avgLen)
+ && Objects.equals(maxLen, colStats.maxLen);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ colId,
+ distinctCount,
+ serializedMin,
+ min,
+ serializedMax,
+ max,
+ nullCount,
+ avgLen,
+ maxLen);
+ }
+
+ @Override
+ public String toString() {
+ return JsonSerdeUtil.toJson(this);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java
b/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java
new file mode 100644
index 000000000..046b66a51
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/Stats.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.OptionalUtils;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * Global stats, supports the following stats.
+ *
+ * <ul>
+ * <li>mergedRecordCount: the total number of records after merge
+ * <li>mergedRecordSize: the size of the mergedRecordCount in bytes
+ * <li>colStats: column stats map
+ * </ul>
+ */
+public class Stats {
+
+ // ID of the snapshot this statistics collected from
+ private static final String FIELD_SNAPSHOT_ID = "snapshotId";
+ // Schema ID of the snapshot this statistics collected from
+ private static final String FIELD_SCHEMA_ID = "schemaId";
+ private static final String FIELD_MERGED_RECORD_COUNT =
"mergedRecordCount";
+ private static final String FIELD_MERGED_RECORD_SIZE = "mergedRecordSize";
+ private static final String FIELD_COL_STATS = "colStats";
+
+ @JsonProperty(FIELD_SNAPSHOT_ID)
+ private final long snapshotId;
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final long schemaId;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_MERGED_RECORD_COUNT)
+ private final @Nullable Long mergedRecordCount;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_MERGED_RECORD_SIZE)
+ private final @Nullable Long mergedRecordSize;
+
+ @JsonProperty(FIELD_COL_STATS)
+ private final Map<String, ColStats<?>> colStats;
+
+ @JsonCreator
+ public Stats(
+ @JsonProperty(FIELD_SNAPSHOT_ID) long snapshotId,
+ @JsonProperty(FIELD_SCHEMA_ID) long schemaId,
+ @JsonProperty(FIELD_MERGED_RECORD_COUNT) @Nullable Long
mergedRecordCount,
+ @JsonProperty(FIELD_MERGED_RECORD_SIZE) @Nullable Long
mergedRecordSize,
+ @JsonProperty(FIELD_COL_STATS) Map<String, ColStats<?>> colStats) {
+ this.snapshotId = snapshotId;
+ this.schemaId = schemaId;
+ this.mergedRecordCount = mergedRecordCount;
+ this.mergedRecordSize = mergedRecordSize;
+ this.colStats = colStats;
+ }
+
+ public Stats(long snapshotId, long schemaId, Long mergedRecordCount, Long
mergedRecordSize) {
+ this(snapshotId, schemaId, mergedRecordCount, mergedRecordSize,
Collections.emptyMap());
+ }
+
+ public long snapshotId() {
+ return snapshotId;
+ }
+
+ public long schemaId() {
+ return schemaId;
+ }
+
+ public OptionalLong mergedRecordCount() {
+ return OptionalUtils.ofNullable(mergedRecordCount);
+ }
+
+ public OptionalLong mergedRecordSize() {
+ return OptionalUtils.ofNullable(mergedRecordSize);
+ }
+
+ public Map<String, ColStats<?>> colStats() {
+ return colStats;
+ }
+
+ public void serializeFieldsToString(TableSchema schema) {
+ try {
+ if (colStats != null) {
+ for (Map.Entry<String, ColStats<?>> entry :
colStats.entrySet()) {
+ String colName = entry.getKey();
+ ColStats<?> colStats = entry.getValue();
+ DataType type =
+ schema.fields().stream()
+ .filter(field ->
field.name().equals(colName))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Unable to obtain
the latest schema"))
+ .type();
+ colStats.serializeFieldsToString(type);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to serialize fields to string",
e);
+ }
+ }
+
+ public void deserializeFieldsFromString(TableSchema schema) {
+ try {
+ if (colStats != null) {
+ for (Map.Entry<String, ColStats<?>> entry :
colStats.entrySet()) {
+ String colName = entry.getKey();
+ ColStats<?> colStats = entry.getValue();
+ DataType type =
+ schema.fields().stream()
+ .filter(field ->
field.name().equals(colName))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Unable to obtain
the latest schema"))
+ .type();
+ colStats.deserializeFieldsFromString(type);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to deserialize fields from
string", e);
+ }
+ }
+
+ public String toJson() {
+ return JsonSerdeUtil.toJson(this);
+ }
+
+ public static Stats fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, Stats.class);
+ }
+
+ public static Stats fromPath(FileIO fileIO, Path path) {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return Stats.fromJson(json);
+ } catch (IOException e) {
+ throw new RuntimeException("Fails to read snapshot from path " +
path, e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Stats stats = (Stats) o;
+ return snapshotId == stats.snapshotId
+ && schemaId == stats.schemaId
+ && Objects.equals(mergedRecordCount, stats.mergedRecordCount)
+ && Objects.equals(mergedRecordSize, stats.mergedRecordSize)
+ && Objects.equals(colStats, stats.colStats);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(snapshotId, schemaId, mergedRecordCount,
mergedRecordSize, colStats);
+ }
+
+ @Override
+ public String toString() {
+ return JsonSerdeUtil.toJson(this);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java
new file mode 100644
index 000000000..252f785bb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFile.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/** Stats file contains stats. */
+public class StatsFile {
+
+ private final FileIO fileIO;
+ private final PathFactory pathFactory;
+
+ public StatsFile(FileIO fileIO, PathFactory pathFactory) {
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ }
+
+ /**
+ * Read stats from stat file name.
+ *
+ * @return stats
+ */
+ public Stats read(String fileName) {
+ return Stats.fromPath(fileIO, pathFactory.toPath(fileName));
+ }
+
+ /**
+ * Write stats to a stats file.
+ *
+ * @return the written file name
+ */
+ public String write(Stats stats) {
+ Path path = pathFactory.newPath();
+
+ try {
+ fileIO.writeFileUtf8(path, stats.toJson());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write stats file: " + path,
e);
+ }
+ return path.getName();
+ }
+
+ public void delete(String fileName) {
+ fileIO.deleteQuietly(pathFactory.toPath(fileName));
+ }
+
+ public boolean exists(String fileName) {
+ try {
+ return fileIO.exists(pathFactory.toPath(fileName));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
new file mode 100644
index 000000000..73ed6c5d4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.util.Optional;
+
+/** Handler of StatsFile. */
+public class StatsFileHandler {
+
+ private final SnapshotManager snapshotManager;
+ private final SchemaManager schemaManager;
+ private final StatsFile statsFile;
+
+ public StatsFileHandler(
+ SnapshotManager snapshotManager, SchemaManager schemaManager,
StatsFile statsFile) {
+ this.snapshotManager = snapshotManager;
+ this.schemaManager = schemaManager;
+ this.statsFile = statsFile;
+ }
+
+ /**
+ * Write stats to a stats file.
+ *
+ * @return the written file name
+ */
+ public String writeStats(Stats stats) {
+ stats.serializeFieldsToString(schemaManager.schema(stats.schemaId()));
+ return statsFile.write(stats);
+ }
+
+ /**
+ * Read stats of the latest snapshot.
+ *
+ * @return stats
+ */
+ public Optional<Stats> readStats() {
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ throw new IllegalStateException("Unable to obtain the latest
schema");
+ }
+ return readStats(latestSnapshotId);
+ }
+
+ /**
+ * Read stats of the specified snapshot.
+ *
+ * @return stats
+ */
+ public Optional<Stats> readStats(long snapshotId) {
+ return readStats(snapshotManager.snapshot(snapshotId));
+ }
+
+ public Optional<Stats> readStats(Snapshot snapshot) {
+ if (snapshot.statistics() == null) {
+ return Optional.empty();
+ } else {
+ Stats stats = statsFile.read(snapshot.statistics());
+
stats.deserializeFieldsFromString(schemaManager.schema(stats.schemaId()));
+ return Optional.of(stats);
+ }
+ }
+
+ /** Delete stats of the specified snapshot. */
+ public void deleteStats(long snapshotId) {
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+ if (snapshot.statistics() != null) {
+ statsFile.delete(snapshot.statistics());
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 17d167d72..f9aaaf67b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -56,6 +56,7 @@ public class FileStorePathFactory {
private final AtomicInteger manifestListCount;
private final AtomicInteger indexManifestCount;
private final AtomicInteger indexFileCount;
+ private final AtomicInteger statsFileCount;
public FileStorePathFactory(Path root) {
this(
@@ -78,6 +79,7 @@ public class FileStorePathFactory {
this.manifestListCount = new AtomicInteger(0);
this.indexManifestCount = new AtomicInteger(0);
this.indexFileCount = new AtomicInteger(0);
+ this.statsFileCount = new AtomicInteger(0);
}
public Path root() {
@@ -207,4 +209,23 @@ public class FileStorePathFactory {
}
};
}
+
+ public PathFactory statsFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(
+ root
+ + "/statistics/stats-"
+ + uuid
+ + "-"
+ + statsFileCount.getAndIncrement());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(root + "/statistics/" + fileName);
+ }
+ };
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 03626a6b6..c41741c08 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -722,6 +722,7 @@ public class FileDeletionTest {
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
store.snapshotManager().latestSnapshot(),
+ null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 10cde724a..58d961b49 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -35,8 +35,14 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.stats.ColStats;
+import org.apache.paimon.stats.Stats;
+import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
@@ -774,6 +780,69 @@ public class FileStoreCommitTest {
assertThat(file).isEmpty();
}
+ @Test
+ public void testWriteStats() throws Exception {
+ TestFileStore store = createStore(false, 1,
CoreOptions.ChangelogProducer.NONE);
+ StatsFileHandler statsFileHandler = store.newStatsFileHandler();
+ FileStoreCommitImpl fileStoreCommit = store.newCommit();
+ store.commitData(generateDataList(10), gen::getPartition, kv -> 0,
Collections.emptyMap());
+ Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+
+ // Analyze and check
+ HashMap<String, ColStats<?>> fakeColStatsMap = new HashMap<>();
+ fakeColStatsMap.put("orderId", new ColStats<>(3, 10L, 1L, 10L, 0L, 8L,
8L));
+ Stats fakeStats =
+ new Stats(
+ latestSnapshot.id(),
+ latestSnapshot.schemaId(),
+ 10L,
+ 1000L,
+ fakeColStatsMap);
+ fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+ Optional<Stats> readStats = statsFileHandler.readStats();
+ assertThat(readStats).isPresent();
+ assertThat(readStats.get()).isEqualTo(fakeStats);
+
+ // New snapshot will inherit last snapshot's stats
+ store.commitData(generateDataList(10), gen::getPartition, kv -> 0,
Collections.emptyMap());
+ readStats = statsFileHandler.readStats();
+ assertThat(readStats).isPresent();
+ assertThat(readStats.get()).isEqualTo(fakeStats);
+
+ // When table schema is modified, new snapshot will not inherit last
snapshot's stats
+ ArrayList<DataField> newFields =
+ new
ArrayList<>(TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields());
+ newFields.add(new DataField(-1, "newField", DataTypes.INT()));
+ store.mergeSchema(new RowType(false, newFields), true);
+ store.commitData(generateDataList(10), gen::getPartition, kv -> 0,
Collections.emptyMap());
+ readStats = statsFileHandler.readStats();
+ assertThat(readStats).isEmpty();
+
+ // Then we need to analyze again
+ latestSnapshot = store.snapshotManager().latestSnapshot();
+ fakeColStatsMap = new HashMap<>();
+ fakeColStatsMap.put("orderId", new ColStats<>(3, 30L, 1L, 30L, 0L, 8L,
8L));
+ fakeStats =
+ new Stats(
+ latestSnapshot.id(),
+ latestSnapshot.schemaId(),
+ 30L,
+ 3000L,
+ fakeColStatsMap);
+ fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+ readStats = statsFileHandler.readStats();
+ assertThat(readStats).isPresent();
+ assertThat(readStats.get()).isEqualTo(fakeStats);
+
+ // Analyze without col stats and check
+ latestSnapshot = store.snapshotManager().latestSnapshot();
+ fakeStats = new Stats(latestSnapshot.id(), latestSnapshot.schemaId(),
30L, 3000L);
+ fileStoreCommit.commitStatistics(fakeStats, Long.MAX_VALUE);
+ readStats = statsFileHandler.readStats();
+ assertThat(readStats).isPresent();
+ assertThat(readStats.get()).isEqualTo(fakeStats);
+ }
+
private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java
new file mode 100644
index 000000000..53ea463a1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/stats/StatsFileTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StatsFile}. */
+public class StatsFileTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ @Test
+ public void test() throws IOException {
+ Path dir = new Path(tempPath.toUri());
+ PathFactory pathFactory =
+ new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(dir, UUID.randomUUID().toString());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(dir, fileName);
+ }
+ };
+
+ StatsFile file = new StatsFile(LocalFileIO.create(), pathFactory);
+ HashMap<String, ColStats<?>> colStatsMap = new HashMap<>();
+ colStatsMap.put("orderId", new ColStats<>(0, 10L, "111", "222", 0L,
8L, 8L));
+ Stats stats = new Stats(1L, 0L, 10L, 1000L, colStatsMap);
+ String fileName = file.write(stats);
+
+ assertThat(file.exists(fileName)).isTrue();
+
+ assertThat(file.read(fileName)).isEqualTo(stats);
+
+ file.delete(fileName);
+
+ assertThat(file.exists(fileName)).isFalse();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 5e689a826..9778ba3be 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -73,6 +73,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null);
localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i),
snapshot.toJson());
}
@@ -110,6 +111,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
null);
localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i),
snapshot.toJson());
}