This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b0a44c9863 [core/iceberg] Added optional snapshot summary fields to
iceberg metadata (#6503)
b0a44c9863 is described below
commit b0a44c986317458738964988fc9647b4ea856452
Author: 0dunay0 <[email protected]>
AuthorDate: Sat May 23 21:22:04 2026 -0500
[core/iceberg] Added optional snapshot summary fields to iceberg metadata
(#6503)
---
.../paimon/iceberg/IcebergCommitCallback.java | 291 ++++++++++++++++++---
.../paimon/iceberg/IcebergCompatibilityTest.java | 24 +-
.../apache/paimon/iceberg/IcebergMetadataTest.java | 9 +
3 files changed, 291 insertions(+), 33 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 14ffcbf212..0489a2d0bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -109,6 +109,22 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private static final String PUFFIN_FORMAT = "puffin";
+ // Snapshot summary metric keys
+ private static final String SNAPSHOT_SUMMARY_ADDED_DATA_FILES =
"added-data-files";
+ private static final String SNAPSHOT_SUMMARY_ADDED_RECORDS =
"added-records";
+ private static final String SNAPSHOT_SUMMARY_ADDED_FILES_SIZE =
"added-files-size";
+ private static final String SNAPSHOT_SUMMARY_DELETED_DATA_FILES =
"deleted-data-files";
+ private static final String SNAPSHOT_SUMMARY_DELETED_RECORDS =
"deleted-records";
+ private static final String SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE =
"removed-files-size";
+ private static final String SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT =
+ "changed-partition-count";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_RECORDS =
"total-records";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_DATA_FILES =
"total-data-files";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE =
"total-files-size";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES =
"total-delete-files";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES =
"total-position-deletes";
+ private static final String SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
+
private final FileStoreTable table;
private final String commitUser;
@@ -300,45 +316,83 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private void createMetadataWithoutBase(long snapshotId) throws IOException
{
SnapshotReader snapshotReader =
table.newSnapshotReader().withSnapshot(snapshotId);
+ Snapshot paimonSnapshot = table.snapshotManager().snapshot(snapshotId);
SchemaCache schemaCache = new SchemaCache();
List<IcebergManifestEntry> dataFileEntries = new ArrayList<>();
List<IcebergManifestEntry> dvFileEntries = new ArrayList<>();
+ SummaryMetrics metrics = new SummaryMetrics();
+ Set<BinaryRow> changedPartitions = new HashSet<>();
List<DataSplit> filteredDataSplits =
snapshotReader.read().dataSplits().stream()
.filter(DataSplit::rawConvertible)
.collect(Collectors.toList());
for (DataSplit dataSplit : filteredDataSplits) {
+ changedPartitions.add(dataSplit.partition());
dataSplitToManifestEntries(
dataSplit, snapshotId, schemaCache, dataFileEntries,
dvFileEntries);
+
+ for (DataFileMeta paimonFileMeta : dataSplit.dataFiles()) {
+ metrics.addedDataFiles++;
+ metrics.addedRecords += paimonFileMeta.rowCount();
+ metrics.addedFilesSize += paimonFileMeta.fileSize();
+ }
}
- List<IcebergManifestFileMeta> manifestFileMetas = new ArrayList<>();
+ List<IcebergManifestFileMeta> dataManifestFileMetas = new
ArrayList<>();
if (!dataFileEntries.isEmpty()) {
- manifestFileMetas.addAll(
+ dataManifestFileMetas.addAll(
manifestFile.rollingWrite(dataFileEntries.iterator(),
snapshotId));
}
+
+ List<IcebergManifestFileMeta> dvManifestFileMetas = new ArrayList<>();
if (!dvFileEntries.isEmpty()) {
- manifestFileMetas.addAll(
+ dvManifestFileMetas.addAll(
manifestFile.rollingWrite(
dvFileEntries.iterator(),
snapshotId,
IcebergManifestFileMeta.Content.DELETES));
}
- String manifestListFileName =
manifestList.writeWithoutRolling(manifestFileMetas);
+ List<IcebergManifestFileMeta> allManifestFileMetas = new ArrayList<>();
+ allManifestFileMetas.addAll(dataManifestFileMetas);
+ allManifestFileMetas.addAll(dvManifestFileMetas);
+
+ metrics.changedPartitionCount = changedPartitions.size();
+ metrics.totalDataFiles = metrics.addedDataFiles;
+ metrics.deletedDataFiles = 0;
+ metrics.deletedRecords = 0;
+ metrics.deletedFilesSize = 0;
+ metrics.totalRecords = metrics.addedRecords;
+ metrics.totalFilesSize = metrics.addedFilesSize;
+ long totalDeleteFiles =
dvFileEntries.stream().filter(IcebergManifestEntry::isLive).count();
+ long totalPositionDeleteRecords =
+ dvFileEntries.stream()
+ .filter(IcebergManifestEntry::isLive)
+ .mapToLong(entry -> entry.file().recordCount())
+ .sum();
+ metrics.totalDeleteFiles = totalDeleteFiles;
+ metrics.totalPositionDeletes = totalPositionDeleteRecords;
+ metrics.totalEqualityDeletes = 0;
+
+ String manifestListFileName =
manifestList.writeWithoutRolling(allManifestFileMetas);
int schemaId = (int) schemaCache.getLatestSchemaId();
IcebergSchema icebergSchema = schemaCache.get(schemaId);
List<IcebergPartitionField> partitionFields =
getPartitionFields(table.schema().partitionKeys(),
icebergSchema);
+
+ IcebergSnapshotSummary snapshotSummary =
+ computeSnapshotSummary(
+ IcebergSnapshotSummary.APPEND.operation(),
paimonSnapshot, metrics);
+
IcebergSnapshot snapshot =
new IcebergSnapshot(
snapshotId,
snapshotId,
null,
System.currentTimeMillis(),
- IcebergSnapshotSummary.APPEND,
+ snapshotSummary,
pathFactory.toManifestListPath(manifestListFileName).toString(),
schemaId,
null,
@@ -522,12 +576,14 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
.filter(meta -> meta.content() ==
IcebergManifestFileMeta.Content.DELETES)
.collect(Collectors.toList());
- Map<String, BinaryRow> removedFiles = new LinkedHashMap<>();
+ Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles = new
LinkedHashMap<>();
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles = new
LinkedHashMap<>();
boolean isAddOnly = fileChangesCollector.collect(removedFiles,
addedFiles);
- Set<BinaryRow> modifiedPartitionsSet = new
LinkedHashSet<>(removedFiles.values());
- modifiedPartitionsSet.addAll(
-
addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList()));
+ Set<BinaryRow> modifiedPartitionsSet =
+ removedFiles.values().stream()
+ .map(Pair::getLeft)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+
addedFiles.values().stream().map(Pair::getLeft).forEach(modifiedPartitionsSet::add);
List<BinaryRow> modifiedPartitions = new
ArrayList<>(modifiedPartitionsSet);
// Note that this check may be different from `removedFiles.isEmpty()`,
@@ -535,16 +591,16 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
// In this case, if `baseMetadata` already contains this file, we
should not add a
// duplicate.
List<IcebergManifestFileMeta> newDataManifestFileMetas;
- IcebergSnapshotSummary snapshotSummary;
+ String operation;
if (isAddOnly) {
// Fast case. We don't need to remove files from `baseMetadata`.
We only need to append
// new metadata files.
newDataManifestFileMetas = new
ArrayList<>(baseDataManifestFileMetas);
newDataManifestFileMetas.addAll(
createNewlyAddedManifestFileMetas(addedFiles, snapshotId));
- snapshotSummary = IcebergSnapshotSummary.APPEND;
+ operation = IcebergSnapshotSummary.APPEND.operation();
} else {
- Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> result
=
+ Pair<List<IcebergManifestFileMeta>, String> result =
createWithDeleteManifestFileMetas(
removedFiles,
addedFiles,
@@ -553,7 +609,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
snapshotId,
snapshot.commitKind());
newDataManifestFileMetas = result.getLeft();
- snapshotSummary = result.getRight();
+ operation = result.getRight();
}
List<IcebergManifestFileMeta> newDVManifestFileMetas = new
ArrayList<>();
@@ -577,6 +633,59 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
newDVManifestFileMetas.stream())
.collect(Collectors.toList()));
+ SummaryMetrics metrics = new SummaryMetrics();
+ metrics.addedDataFiles = addedFiles.size();
+ metrics.addedRecords =
+ addedFiles.values().stream().mapToLong(p ->
p.getRight().rowCount()).sum();
+ metrics.addedFilesSize =
+ addedFiles.values().stream().mapToLong(p ->
p.getRight().fileSize()).sum();
+ metrics.deletedDataFiles = removedFiles.size();
+ metrics.deletedRecords =
+ removedFiles.values().stream().mapToLong(p ->
p.getRight().rowCount()).sum();
+ metrics.deletedFilesSize =
+ removedFiles.values().stream().mapToLong(p ->
p.getRight().fileSize()).sum();
+ metrics.changedPartitionCount = modifiedPartitionsSet.size();
+
+ IcebergSnapshot baseSnapshot = baseMetadata.currentSnapshot();
+
+ Long previousTotalRecordsValue =
+ getSummaryLong(baseSnapshot, SNAPSHOT_SUMMARY_TOTAL_RECORDS);
+ long previousTotalRecords =
+ previousTotalRecordsValue != null
+ ? previousTotalRecordsValue
+ : computeLiveRowCount(baseDataManifestFileMetas);
+ metrics.totalRecords =
+ Math.max(0, previousTotalRecords + metrics.addedRecords -
metrics.deletedRecords);
+
+ Long previousTotalDataFilesValue =
+ getSummaryLong(baseSnapshot,
SNAPSHOT_SUMMARY_TOTAL_DATA_FILES);
+ long previousTotalDataFiles =
+ previousTotalDataFilesValue != null
+ ? previousTotalDataFilesValue
+ : computeLiveFileCount(baseDataManifestFileMetas);
+ metrics.totalDataFiles =
+ Math.max(
+ 0,
+ previousTotalDataFiles + metrics.addedDataFiles -
metrics.deletedDataFiles);
+
+ Long previousTotalFilesSizeValue =
+ getSummaryLong(baseSnapshot,
SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE);
+ long previousTotalFilesSize =
+ previousTotalFilesSizeValue != null
+ ? previousTotalFilesSizeValue
+ :
computeTotalFilesSizeFromManifests(baseDataManifestFileMetas);
+ metrics.totalFilesSize =
+ Math.max(
+ 0,
+ previousTotalFilesSize + metrics.addedFilesSize -
metrics.deletedFilesSize);
+
+ metrics.totalDeleteFiles =
computeLiveFileCount(newDVManifestFileMetas);
+ metrics.totalPositionDeletes =
computeLiveRowCount(newDVManifestFileMetas);
+ metrics.totalEqualityDeletes = 0;
+
+ IcebergSnapshotSummary snapshotSummary =
+ computeSnapshotSummary(operation, snapshot, metrics);
+
// add new schemas if needed
SchemaCache schemaCache = new SchemaCache();
int schemaId = (int) schemaCache.getLatestSchemaId();
@@ -678,14 +787,14 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private interface FileChangesCollector {
boolean collect(
- Map<String, BinaryRow> removedFiles,
+ Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles)
throws IOException;
}
private boolean collectFileChanges(
List<ManifestEntry> manifestEntries,
- Map<String, BinaryRow> removedFiles,
+ Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
boolean isAddOnly = true;
DataFilePathFactories factories = new
DataFilePathFactories(fileStorePathFactory);
@@ -703,7 +812,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
case DELETE:
isAddOnly = false;
addedFiles.remove(path);
- removedFiles.put(path, entry.partition());
+ removedFiles.put(path, Pair.of(entry.partition(),
entry.file()));
break;
default:
throw new UnsupportedOperationException(
@@ -715,7 +824,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
private boolean collectFileChanges(
long snapshotId,
- Map<String, BinaryRow> removedFiles,
+ Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
return collectFileChanges(
table.store()
@@ -775,16 +884,15 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
currentSnapshotId);
}
- private Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary>
- createWithDeleteManifestFileMetas(
- Map<String, BinaryRow> removedFiles,
- Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles,
- List<BinaryRow> modifiedPartitions,
- List<IcebergManifestFileMeta> baseManifestFileMetas,
- long currentSnapshotId,
- Snapshot.CommitKind commitKind)
- throws IOException {
- IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND;
+ private Pair<List<IcebergManifestFileMeta>, String>
createWithDeleteManifestFileMetas(
+ Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
+ Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles,
+ List<BinaryRow> modifiedPartitions,
+ List<IcebergManifestFileMeta> baseManifestFileMetas,
+ long currentSnapshotId,
+ Snapshot.CommitKind commitKind)
+ throws IOException {
+ String operation = IcebergSnapshotSummary.APPEND.operation();
List<IcebergManifestFileMeta> newManifestFileMetas = new ArrayList<>();
RowType partitionType = table.schema().logicalPartitionType();
@@ -837,10 +945,10 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
newManifestFileMetas.add(fileMeta);
} else {
// some file is removed, rewrite this file meta
- snapshotSummary =
+ operation =
commitKind == Snapshot.CommitKind.COMPACT
- ? IcebergSnapshotSummary.REPLACE
- : IcebergSnapshotSummary.OVERWRITE;
+ ?
IcebergSnapshotSummary.REPLACE.operation()
+ :
IcebergSnapshotSummary.OVERWRITE.operation();
List<IcebergManifestEntry> newEntries = new ArrayList<>();
for (IcebergManifestEntry entry : entries) {
if (entry.isLive()) {
@@ -867,7 +975,7 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
newManifestFileMetas.addAll(
createNewlyAddedManifestFileMetas(addedFiles,
currentSnapshotId));
- return Pair.of(newManifestFileMetas, snapshotSummary);
+ return Pair.of(newManifestFileMetas, operation);
}
//
-------------------------------------------------------------------------------------
@@ -1242,6 +1350,127 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
icebergDvEntries.iterator(), snapshotId,
IcebergManifestFileMeta.Content.DELETES);
}
+ //
-------------------------------------------------------------------------------------
+ // Snapshot Summary Computation
+ //
-------------------------------------------------------------------------------------
+
+ private static class SummaryMetrics {
+ long addedDataFiles;
+ long addedRecords;
+ long addedFilesSize;
+ long deletedDataFiles;
+ long deletedRecords;
+ long deletedFilesSize;
+ long changedPartitionCount;
+ long totalDataFiles;
+ long totalRecords;
+ long totalFilesSize;
+ long totalDeleteFiles;
+ long totalPositionDeletes;
+ long totalEqualityDeletes;
+ }
+
+ private IcebergSnapshotSummary computeSnapshotSummary(
+ String operation, Snapshot snapshot, SummaryMetrics metrics) {
+
+ IcebergSnapshotSummary summary = new IcebergSnapshotSummary(operation);
+
+ long addedDataFiles = Math.max(0, metrics.addedDataFiles);
+ long addedRecords = Math.max(0, metrics.addedRecords);
+ long addedFilesSize = Math.max(0, metrics.addedFilesSize);
+ long deletedDataFiles = Math.max(0, metrics.deletedDataFiles);
+ long deletedRecords = Math.max(0, metrics.deletedRecords);
+ long deletedFilesSize = Math.max(0, metrics.deletedFilesSize);
+ long changedPartitionCount = Math.max(0,
metrics.changedPartitionCount);
+ long totalRecords = Math.max(0, metrics.totalRecords);
+ long totalDataFiles = Math.max(0, metrics.totalDataFiles);
+ long totalFilesSize = Math.max(0, metrics.totalFilesSize);
+ long totalDeleteFiles = Math.max(0, metrics.totalDeleteFiles);
+ long totalPositionDeletes = Math.max(0, metrics.totalPositionDeletes);
+ long totalEqualityDeletes = Math.max(0, metrics.totalEqualityDeletes);
+
+ summary.put(SNAPSHOT_SUMMARY_ADDED_DATA_FILES,
Long.toString(addedDataFiles));
+ summary.put(SNAPSHOT_SUMMARY_ADDED_RECORDS,
Long.toString(addedRecords));
+ summary.put(SNAPSHOT_SUMMARY_ADDED_FILES_SIZE,
Long.toString(addedFilesSize));
+ summary.put(SNAPSHOT_SUMMARY_DELETED_DATA_FILES,
Long.toString(deletedDataFiles));
+ summary.put(SNAPSHOT_SUMMARY_DELETED_RECORDS,
Long.toString(deletedRecords));
+ summary.put(SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE,
Long.toString(deletedFilesSize));
+ summary.put(SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT,
Long.toString(changedPartitionCount));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_RECORDS,
Long.toString(totalRecords));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_DATA_FILES,
Long.toString(totalDataFiles));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE,
Long.toString(totalFilesSize));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES,
Long.toString(totalDeleteFiles));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES,
Long.toString(totalPositionDeletes));
+ summary.put(SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES,
Long.toString(totalEqualityDeletes));
+
+ Map<String, String> properties = snapshot.properties();
+ if (properties != null) {
+ properties.forEach(
+ (key, value) -> {
+ if (value != null) {
+ summary.put(key, value);
+ }
+ });
+ }
+
+ return summary;
+ }
+
+ private long computeLiveFileCount(List<IcebergManifestFileMeta>
manifestMetas) {
+ return manifestMetas.stream()
+ .mapToLong(
+ meta ->
+ meta.addedFilesCount()
+ + meta.existingFilesCount()
+ - meta.deletedFilesCount())
+ .sum();
+ }
+
+ private long computeLiveRowCount(List<IcebergManifestFileMeta>
manifestMetas) {
+ return manifestMetas.stream()
+ .mapToLong(
+ meta ->
+ meta.addedRowsCount()
+ + meta.existingRowsCount()
+ - meta.deletedRowsCount())
+ .sum();
+ }
+
+ @Nullable
+ private Long getSummaryLong(@Nullable IcebergSnapshot snapshot, String
key) {
+ if (snapshot == null) {
+ return null;
+ }
+ Map<String, String> summaryMap = snapshot.summary().getSummary();
+ String value = summaryMap.get(key);
+ if (value == null) {
+ return null;
+ }
+ try {
+ return Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ LOG.warn(
+ "Unable to parse snapshot summary field {}={} as long. The
value will be recomputed.",
+ key,
+ value);
+ return null;
+ }
+ }
+
+ private long
computeTotalFilesSizeFromManifests(List<IcebergManifestFileMeta> manifestMetas)
+ throws IOException {
+ long total = 0;
+ for (IcebergManifestFileMeta meta : manifestMetas) {
+ for (IcebergManifestEntry entry :
+ manifestFile.read(new
Path(meta.manifestPath()).getName())) {
+ if (entry.isLive()) {
+ total += entry.file().fileSizeInBytes();
+ }
+ }
+ }
+ return total;
+ }
+
//
-------------------------------------------------------------------------------------
// Utils
//
-------------------------------------------------------------------------------------
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 2e09a61df1..71be4035cc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -124,9 +124,29 @@ public class IcebergCompatibilityTest {
commit.commit(1, write.prepareCommit(false, 1));
assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1,
10)", "Record(2, 20)");
+ // Second write with overlapping keys creates a second level-0 file.
+ // The two overlapping level-0 files form a non-rawConvertible split,
+ // so Iceberg can no longer see either file. Verify that total-records
+ // in the snapshot summary tracks the Iceberg-visible count, not
+ // paimon's totalRecordCount (which includes all levels).
+ write.write(GenericRow.of(1, 11));
+ write.write(GenericRow.of(3, 30));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ long snapshotId = table.snapshotManager().latestSnapshotId();
+ List<String> icebergRecords = getIcebergResult();
+ IcebergMetadata metadata =
+ IcebergMetadata.fromPath(
+ table.fileIO(),
+ new Path(table.location(), "metadata/v" + snapshotId +
".metadata.json"));
+ String totalRecords =
+
metadata.currentSnapshot().summary().getSummary().get("total-records");
+
assertThat(totalRecords).isEqualTo(String.valueOf(icebergRecords.size()));
+
write.compact(BinaryRow.EMPTY_ROW, 0, true);
- commit.commit(2, write.prepareCommit(true, 2));
- assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1,
10)", "Record(2, 20)");
+ commit.commit(3, write.prepareCommit(true, 3));
+ assertThat(getIcebergResult())
+ .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)",
"Record(3, 30)");
write.close();
commit.close();
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
index 91a1b5acf2..a6205dc107 100644
---
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
@@ -512,8 +512,17 @@ class IcebergMetadataTest {
// Verify snapshot summary contains operation information
assertThat(snapshot.summary().operation()).isEqualTo("append");
assertThat(snapshot.summary().getSummary().get("operation")).isEqualTo("append");
+
+ // Verify required fields for Redshift Spectrum compatibility
assertThat(snapshot.summary().getSummary().get("total-records")).isEqualTo("10");
+
assertThat(snapshot.summary().getSummary().get("total-data-files")).isEqualTo("1");
+
assertThat(snapshot.summary().getSummary().get("total-delete-files")).isEqualTo("0");
+
assertThat(snapshot.summary().getSummary().get("total-position-deletes")).isEqualTo("0");
+
assertThat(snapshot.summary().getSummary().get("total-equality-deletes")).isEqualTo("0");
+
+ // Verify change fields
assertThat(snapshot.summary().getSummary().get("added-data-files")).isEqualTo("1");
+
assertThat(snapshot.summary().getSummary().get("added-records")).isEqualTo("10");
assertThat(snapshot.summary().getSummary().get("added-files-size")).isEqualTo("100");
}