This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b0a44c9863 [core/iceberg] Added optional snapshot summary fields to 
iceberg metadata (#6503)
b0a44c9863 is described below

commit b0a44c986317458738964988fc9647b4ea856452
Author: 0dunay0 <[email protected]>
AuthorDate: Sat May 23 21:22:04 2026 -0500

    [core/iceberg] Added optional snapshot summary fields to iceberg metadata 
(#6503)
---
 .../paimon/iceberg/IcebergCommitCallback.java      | 291 ++++++++++++++++++---
 .../paimon/iceberg/IcebergCompatibilityTest.java   |  24 +-
 .../apache/paimon/iceberg/IcebergMetadataTest.java |   9 +
 3 files changed, 291 insertions(+), 33 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 14ffcbf212..0489a2d0bf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -109,6 +109,22 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
     private static final String PUFFIN_FORMAT = "puffin";
 
+    // Snapshot summary metric keys
+    private static final String SNAPSHOT_SUMMARY_ADDED_DATA_FILES = 
"added-data-files";
+    private static final String SNAPSHOT_SUMMARY_ADDED_RECORDS = 
"added-records";
+    private static final String SNAPSHOT_SUMMARY_ADDED_FILES_SIZE = 
"added-files-size";
+    private static final String SNAPSHOT_SUMMARY_DELETED_DATA_FILES = 
"deleted-data-files";
+    private static final String SNAPSHOT_SUMMARY_DELETED_RECORDS = 
"deleted-records";
+    private static final String SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE = 
"removed-files-size";
+    private static final String SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT =
+            "changed-partition-count";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_RECORDS = 
"total-records";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_DATA_FILES = 
"total-data-files";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE = 
"total-files-size";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES = 
"total-delete-files";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES = 
"total-position-deletes";
+    private static final String SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
+
     private final FileStoreTable table;
     private final String commitUser;
 
@@ -300,45 +316,83 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
     private void createMetadataWithoutBase(long snapshotId) throws IOException 
{
         SnapshotReader snapshotReader = 
table.newSnapshotReader().withSnapshot(snapshotId);
+        Snapshot paimonSnapshot = table.snapshotManager().snapshot(snapshotId);
         SchemaCache schemaCache = new SchemaCache();
         List<IcebergManifestEntry> dataFileEntries = new ArrayList<>();
         List<IcebergManifestEntry> dvFileEntries = new ArrayList<>();
+        SummaryMetrics metrics = new SummaryMetrics();
+        Set<BinaryRow> changedPartitions = new HashSet<>();
 
         List<DataSplit> filteredDataSplits =
                 snapshotReader.read().dataSplits().stream()
                         .filter(DataSplit::rawConvertible)
                         .collect(Collectors.toList());
         for (DataSplit dataSplit : filteredDataSplits) {
+            changedPartitions.add(dataSplit.partition());
             dataSplitToManifestEntries(
                     dataSplit, snapshotId, schemaCache, dataFileEntries, 
dvFileEntries);
+
+            for (DataFileMeta paimonFileMeta : dataSplit.dataFiles()) {
+                metrics.addedDataFiles++;
+                metrics.addedRecords += paimonFileMeta.rowCount();
+                metrics.addedFilesSize += paimonFileMeta.fileSize();
+            }
         }
 
-        List<IcebergManifestFileMeta> manifestFileMetas = new ArrayList<>();
+        List<IcebergManifestFileMeta> dataManifestFileMetas = new 
ArrayList<>();
         if (!dataFileEntries.isEmpty()) {
-            manifestFileMetas.addAll(
+            dataManifestFileMetas.addAll(
                     manifestFile.rollingWrite(dataFileEntries.iterator(), 
snapshotId));
         }
+
+        List<IcebergManifestFileMeta> dvManifestFileMetas = new ArrayList<>();
         if (!dvFileEntries.isEmpty()) {
-            manifestFileMetas.addAll(
+            dvManifestFileMetas.addAll(
                     manifestFile.rollingWrite(
                             dvFileEntries.iterator(),
                             snapshotId,
                             IcebergManifestFileMeta.Content.DELETES));
         }
 
-        String manifestListFileName = 
manifestList.writeWithoutRolling(manifestFileMetas);
+        List<IcebergManifestFileMeta> allManifestFileMetas = new ArrayList<>();
+        allManifestFileMetas.addAll(dataManifestFileMetas);
+        allManifestFileMetas.addAll(dvManifestFileMetas);
+
+        metrics.changedPartitionCount = changedPartitions.size();
+        metrics.totalDataFiles = metrics.addedDataFiles;
+        metrics.deletedDataFiles = 0;
+        metrics.deletedRecords = 0;
+        metrics.deletedFilesSize = 0;
+        metrics.totalRecords = metrics.addedRecords;
+        metrics.totalFilesSize = metrics.addedFilesSize;
+        long totalDeleteFiles = 
dvFileEntries.stream().filter(IcebergManifestEntry::isLive).count();
+        long totalPositionDeleteRecords =
+                dvFileEntries.stream()
+                        .filter(IcebergManifestEntry::isLive)
+                        .mapToLong(entry -> entry.file().recordCount())
+                        .sum();
+        metrics.totalDeleteFiles = totalDeleteFiles;
+        metrics.totalPositionDeletes = totalPositionDeleteRecords;
+        metrics.totalEqualityDeletes = 0;
+
+        String manifestListFileName = 
manifestList.writeWithoutRolling(allManifestFileMetas);
 
         int schemaId = (int) schemaCache.getLatestSchemaId();
         IcebergSchema icebergSchema = schemaCache.get(schemaId);
         List<IcebergPartitionField> partitionFields =
                 getPartitionFields(table.schema().partitionKeys(), 
icebergSchema);
+
+        IcebergSnapshotSummary snapshotSummary =
+                computeSnapshotSummary(
+                        IcebergSnapshotSummary.APPEND.operation(), 
paimonSnapshot, metrics);
+
         IcebergSnapshot snapshot =
                 new IcebergSnapshot(
                         snapshotId,
                         snapshotId,
                         null,
                         System.currentTimeMillis(),
-                        IcebergSnapshotSummary.APPEND,
+                        snapshotSummary,
                         
pathFactory.toManifestListPath(manifestListFileName).toString(),
                         schemaId,
                         null,
@@ -522,12 +576,14 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                         .filter(meta -> meta.content() == 
IcebergManifestFileMeta.Content.DELETES)
                         .collect(Collectors.toList());
 
-        Map<String, BinaryRow> removedFiles = new LinkedHashMap<>();
+        Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles = new 
LinkedHashMap<>();
         Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles = new 
LinkedHashMap<>();
         boolean isAddOnly = fileChangesCollector.collect(removedFiles, 
addedFiles);
-        Set<BinaryRow> modifiedPartitionsSet = new 
LinkedHashSet<>(removedFiles.values());
-        modifiedPartitionsSet.addAll(
-                
addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList()));
+        Set<BinaryRow> modifiedPartitionsSet =
+                removedFiles.values().stream()
+                        .map(Pair::getLeft)
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
+        
addedFiles.values().stream().map(Pair::getLeft).forEach(modifiedPartitionsSet::add);
         List<BinaryRow> modifiedPartitions = new 
ArrayList<>(modifiedPartitionsSet);
 
         // Note that this check may be different from `removedFiles.isEmpty()`,
@@ -535,16 +591,16 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         // In this case, if `baseMetadata` already contains this file, we 
should not add a
         // duplicate.
         List<IcebergManifestFileMeta> newDataManifestFileMetas;
-        IcebergSnapshotSummary snapshotSummary;
+        String operation;
         if (isAddOnly) {
             // Fast case. We don't need to remove files from `baseMetadata`. 
We only need to append
             // new metadata files.
             newDataManifestFileMetas = new 
ArrayList<>(baseDataManifestFileMetas);
             newDataManifestFileMetas.addAll(
                     createNewlyAddedManifestFileMetas(addedFiles, snapshotId));
-            snapshotSummary = IcebergSnapshotSummary.APPEND;
+            operation = IcebergSnapshotSummary.APPEND.operation();
         } else {
-            Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> result 
=
+            Pair<List<IcebergManifestFileMeta>, String> result =
                     createWithDeleteManifestFileMetas(
                             removedFiles,
                             addedFiles,
@@ -553,7 +609,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                             snapshotId,
                             snapshot.commitKind());
             newDataManifestFileMetas = result.getLeft();
-            snapshotSummary = result.getRight();
+            operation = result.getRight();
         }
 
         List<IcebergManifestFileMeta> newDVManifestFileMetas = new 
ArrayList<>();
@@ -577,6 +633,59 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                                         newDVManifestFileMetas.stream())
                                 .collect(Collectors.toList()));
 
+        SummaryMetrics metrics = new SummaryMetrics();
+        metrics.addedDataFiles = addedFiles.size();
+        metrics.addedRecords =
+                addedFiles.values().stream().mapToLong(p -> 
p.getRight().rowCount()).sum();
+        metrics.addedFilesSize =
+                addedFiles.values().stream().mapToLong(p -> 
p.getRight().fileSize()).sum();
+        metrics.deletedDataFiles = removedFiles.size();
+        metrics.deletedRecords =
+                removedFiles.values().stream().mapToLong(p -> 
p.getRight().rowCount()).sum();
+        metrics.deletedFilesSize =
+                removedFiles.values().stream().mapToLong(p -> 
p.getRight().fileSize()).sum();
+        metrics.changedPartitionCount = modifiedPartitionsSet.size();
+
+        IcebergSnapshot baseSnapshot = baseMetadata.currentSnapshot();
+
+        Long previousTotalRecordsValue =
+                getSummaryLong(baseSnapshot, SNAPSHOT_SUMMARY_TOTAL_RECORDS);
+        long previousTotalRecords =
+                previousTotalRecordsValue != null
+                        ? previousTotalRecordsValue
+                        : computeLiveRowCount(baseDataManifestFileMetas);
+        metrics.totalRecords =
+                Math.max(0, previousTotalRecords + metrics.addedRecords - 
metrics.deletedRecords);
+
+        Long previousTotalDataFilesValue =
+                getSummaryLong(baseSnapshot, 
SNAPSHOT_SUMMARY_TOTAL_DATA_FILES);
+        long previousTotalDataFiles =
+                previousTotalDataFilesValue != null
+                        ? previousTotalDataFilesValue
+                        : computeLiveFileCount(baseDataManifestFileMetas);
+        metrics.totalDataFiles =
+                Math.max(
+                        0,
+                        previousTotalDataFiles + metrics.addedDataFiles - 
metrics.deletedDataFiles);
+
+        Long previousTotalFilesSizeValue =
+                getSummaryLong(baseSnapshot, 
SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE);
+        long previousTotalFilesSize =
+                previousTotalFilesSizeValue != null
+                        ? previousTotalFilesSizeValue
+                        : 
computeTotalFilesSizeFromManifests(baseDataManifestFileMetas);
+        metrics.totalFilesSize =
+                Math.max(
+                        0,
+                        previousTotalFilesSize + metrics.addedFilesSize - 
metrics.deletedFilesSize);
+
+        metrics.totalDeleteFiles = 
computeLiveFileCount(newDVManifestFileMetas);
+        metrics.totalPositionDeletes = 
computeLiveRowCount(newDVManifestFileMetas);
+        metrics.totalEqualityDeletes = 0;
+
+        IcebergSnapshotSummary snapshotSummary =
+                computeSnapshotSummary(operation, snapshot, metrics);
+
         // add new schemas if needed
         SchemaCache schemaCache = new SchemaCache();
         int schemaId = (int) schemaCache.getLatestSchemaId();
@@ -678,14 +787,14 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
     private interface FileChangesCollector {
         boolean collect(
-                Map<String, BinaryRow> removedFiles,
+                Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
                 Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles)
                 throws IOException;
     }
 
     private boolean collectFileChanges(
             List<ManifestEntry> manifestEntries,
-            Map<String, BinaryRow> removedFiles,
+            Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
             Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
         boolean isAddOnly = true;
         DataFilePathFactories factories = new 
DataFilePathFactories(fileStorePathFactory);
@@ -703,7 +812,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 case DELETE:
                     isAddOnly = false;
                     addedFiles.remove(path);
-                    removedFiles.put(path, entry.partition());
+                    removedFiles.put(path, Pair.of(entry.partition(), 
entry.file()));
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -715,7 +824,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
     private boolean collectFileChanges(
             long snapshotId,
-            Map<String, BinaryRow> removedFiles,
+            Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
             Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
         return collectFileChanges(
                 table.store()
@@ -775,16 +884,15 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 currentSnapshotId);
     }
 
-    private Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary>
-            createWithDeleteManifestFileMetas(
-                    Map<String, BinaryRow> removedFiles,
-                    Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles,
-                    List<BinaryRow> modifiedPartitions,
-                    List<IcebergManifestFileMeta> baseManifestFileMetas,
-                    long currentSnapshotId,
-                    Snapshot.CommitKind commitKind)
-                    throws IOException {
-        IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND;
+    private Pair<List<IcebergManifestFileMeta>, String> 
createWithDeleteManifestFileMetas(
+            Map<String, Pair<BinaryRow, DataFileMeta>> removedFiles,
+            Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles,
+            List<BinaryRow> modifiedPartitions,
+            List<IcebergManifestFileMeta> baseManifestFileMetas,
+            long currentSnapshotId,
+            Snapshot.CommitKind commitKind)
+            throws IOException {
+        String operation = IcebergSnapshotSummary.APPEND.operation();
         List<IcebergManifestFileMeta> newManifestFileMetas = new ArrayList<>();
 
         RowType partitionType = table.schema().logicalPartitionType();
@@ -837,10 +945,10 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                     newManifestFileMetas.add(fileMeta);
                 } else {
                     // some file is removed, rewrite this file meta
-                    snapshotSummary =
+                    operation =
                             commitKind == Snapshot.CommitKind.COMPACT
-                                    ? IcebergSnapshotSummary.REPLACE
-                                    : IcebergSnapshotSummary.OVERWRITE;
+                                    ? 
IcebergSnapshotSummary.REPLACE.operation()
+                                    : 
IcebergSnapshotSummary.OVERWRITE.operation();
                     List<IcebergManifestEntry> newEntries = new ArrayList<>();
                     for (IcebergManifestEntry entry : entries) {
                         if (entry.isLive()) {
@@ -867,7 +975,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
         newManifestFileMetas.addAll(
                 createNewlyAddedManifestFileMetas(addedFiles, 
currentSnapshotId));
-        return Pair.of(newManifestFileMetas, snapshotSummary);
+        return Pair.of(newManifestFileMetas, operation);
     }
 
     // 
-------------------------------------------------------------------------------------
@@ -1242,6 +1350,127 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                 icebergDvEntries.iterator(), snapshotId, 
IcebergManifestFileMeta.Content.DELETES);
     }
 
+    // 
-------------------------------------------------------------------------------------
+    // Snapshot Summary Computation
+    // 
-------------------------------------------------------------------------------------
+
+    private static class SummaryMetrics {
+        long addedDataFiles;
+        long addedRecords;
+        long addedFilesSize;
+        long deletedDataFiles;
+        long deletedRecords;
+        long deletedFilesSize;
+        long changedPartitionCount;
+        long totalDataFiles;
+        long totalRecords;
+        long totalFilesSize;
+        long totalDeleteFiles;
+        long totalPositionDeletes;
+        long totalEqualityDeletes;
+    }
+
+    private IcebergSnapshotSummary computeSnapshotSummary(
+            String operation, Snapshot snapshot, SummaryMetrics metrics) {
+
+        IcebergSnapshotSummary summary = new IcebergSnapshotSummary(operation);
+
+        long addedDataFiles = Math.max(0, metrics.addedDataFiles);
+        long addedRecords = Math.max(0, metrics.addedRecords);
+        long addedFilesSize = Math.max(0, metrics.addedFilesSize);
+        long deletedDataFiles = Math.max(0, metrics.deletedDataFiles);
+        long deletedRecords = Math.max(0, metrics.deletedRecords);
+        long deletedFilesSize = Math.max(0, metrics.deletedFilesSize);
+        long changedPartitionCount = Math.max(0, 
metrics.changedPartitionCount);
+        long totalRecords = Math.max(0, metrics.totalRecords);
+        long totalDataFiles = Math.max(0, metrics.totalDataFiles);
+        long totalFilesSize = Math.max(0, metrics.totalFilesSize);
+        long totalDeleteFiles = Math.max(0, metrics.totalDeleteFiles);
+        long totalPositionDeletes = Math.max(0, metrics.totalPositionDeletes);
+        long totalEqualityDeletes = Math.max(0, metrics.totalEqualityDeletes);
+
+        summary.put(SNAPSHOT_SUMMARY_ADDED_DATA_FILES, 
Long.toString(addedDataFiles));
+        summary.put(SNAPSHOT_SUMMARY_ADDED_RECORDS, 
Long.toString(addedRecords));
+        summary.put(SNAPSHOT_SUMMARY_ADDED_FILES_SIZE, 
Long.toString(addedFilesSize));
+        summary.put(SNAPSHOT_SUMMARY_DELETED_DATA_FILES, 
Long.toString(deletedDataFiles));
+        summary.put(SNAPSHOT_SUMMARY_DELETED_RECORDS, 
Long.toString(deletedRecords));
+        summary.put(SNAPSHOT_SUMMARY_REMOVED_FILES_SIZE, 
Long.toString(deletedFilesSize));
+        summary.put(SNAPSHOT_SUMMARY_CHANGED_PARTITION_COUNT, 
Long.toString(changedPartitionCount));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_RECORDS, 
Long.toString(totalRecords));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_DATA_FILES, 
Long.toString(totalDataFiles));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_FILES_SIZE, 
Long.toString(totalFilesSize));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_DELETE_FILES, 
Long.toString(totalDeleteFiles));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_POSITION_DELETES, 
Long.toString(totalPositionDeletes));
+        summary.put(SNAPSHOT_SUMMARY_TOTAL_EQUALITY_DELETES, 
Long.toString(totalEqualityDeletes));
+
+        Map<String, String> properties = snapshot.properties();
+        if (properties != null) {
+            properties.forEach(
+                    (key, value) -> {
+                        if (value != null) {
+                            summary.put(key, value);
+                        }
+                    });
+        }
+
+        return summary;
+    }
+
+    private long computeLiveFileCount(List<IcebergManifestFileMeta> 
manifestMetas) {
+        return manifestMetas.stream()
+                .mapToLong(
+                        meta ->
+                                meta.addedFilesCount()
+                                        + meta.existingFilesCount()
+                                        - meta.deletedFilesCount())
+                .sum();
+    }
+
+    private long computeLiveRowCount(List<IcebergManifestFileMeta> 
manifestMetas) {
+        return manifestMetas.stream()
+                .mapToLong(
+                        meta ->
+                                meta.addedRowsCount()
+                                        + meta.existingRowsCount()
+                                        - meta.deletedRowsCount())
+                .sum();
+    }
+
+    @Nullable
+    private Long getSummaryLong(@Nullable IcebergSnapshot snapshot, String 
key) {
+        if (snapshot == null) {
+            return null;
+        }
+        Map<String, String> summaryMap = snapshot.summary().getSummary();
+        String value = summaryMap.get(key);
+        if (value == null) {
+            return null;
+        }
+        try {
+            return Long.parseLong(value);
+        } catch (NumberFormatException e) {
+            LOG.warn(
+                    "Unable to parse snapshot summary field {}={} as long. The 
value will be recomputed.",
+                    key,
+                    value);
+            return null;
+        }
+    }
+
+    private long 
computeTotalFilesSizeFromManifests(List<IcebergManifestFileMeta> manifestMetas)
+            throws IOException {
+        long total = 0;
+        for (IcebergManifestFileMeta meta : manifestMetas) {
+            for (IcebergManifestEntry entry :
+                    manifestFile.read(new 
Path(meta.manifestPath()).getName())) {
+                if (entry.isLive()) {
+                    total += entry.file().fileSizeInBytes();
+                }
+            }
+        }
+        return total;
+    }
+
     // 
-------------------------------------------------------------------------------------
     // Utils
     // 
-------------------------------------------------------------------------------------
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 2e09a61df1..71be4035cc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -124,9 +124,29 @@ public class IcebergCompatibilityTest {
         commit.commit(1, write.prepareCommit(false, 1));
         assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 
10)", "Record(2, 20)");
 
+        // Second write with overlapping keys creates a second level-0 file.
+        // The two overlapping level-0 files form a non-rawConvertible split,
+        // so Iceberg can no longer see either file. Verify that total-records
+        // in the snapshot summary tracks the Iceberg-visible count, not
+        // paimon's totalRecordCount (which includes all levels).
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 30));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        long snapshotId = table.snapshotManager().latestSnapshotId();
+        List<String> icebergRecords = getIcebergResult();
+        IcebergMetadata metadata =
+                IcebergMetadata.fromPath(
+                        table.fileIO(),
+                        new Path(table.location(), "metadata/v" + snapshotId + 
".metadata.json"));
+        String totalRecords =
+                
metadata.currentSnapshot().summary().getSummary().get("total-records");
+        
assertThat(totalRecords).isEqualTo(String.valueOf(icebergRecords.size()));
+
         write.compact(BinaryRow.EMPTY_ROW, 0, true);
-        commit.commit(2, write.prepareCommit(true, 2));
-        assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 
10)", "Record(2, 20)");
+        commit.commit(3, write.prepareCommit(true, 3));
+        assertThat(getIcebergResult())
+                .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", 
"Record(3, 30)");
 
         write.close();
         commit.close();
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
index 91a1b5acf2..a6205dc107 100644
--- 
a/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/iceberg/IcebergMetadataTest.java
@@ -512,8 +512,17 @@ class IcebergMetadataTest {
         // Verify snapshot summary contains operation information
         assertThat(snapshot.summary().operation()).isEqualTo("append");
         
assertThat(snapshot.summary().getSummary().get("operation")).isEqualTo("append");
+
+        // Verify required fields for Redshift Spectrum compatibility
         
assertThat(snapshot.summary().getSummary().get("total-records")).isEqualTo("10");
+        
assertThat(snapshot.summary().getSummary().get("total-data-files")).isEqualTo("1");
+        
assertThat(snapshot.summary().getSummary().get("total-delete-files")).isEqualTo("0");
+        
assertThat(snapshot.summary().getSummary().get("total-position-deletes")).isEqualTo("0");
+        
assertThat(snapshot.summary().getSummary().get("total-equality-deletes")).isEqualTo("0");
+
+        // Verify change fields
         
assertThat(snapshot.summary().getSummary().get("added-data-files")).isEqualTo("1");
+        
assertThat(snapshot.summary().getSummary().get("added-records")).isEqualTo("10");
         
assertThat(snapshot.summary().getSummary().get("added-files-size")).isEqualTo("100");
     }
 

Reply via email to