This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 58b3e36731 Spark 4.0: Display write metrics on SQL UI (#15468)
58b3e36731 is described below

commit 58b3e3673107e9cb474213d3b66348ceb3b206d4
Author: Manu Zhang <[email protected]>
AuthorDate: Mon Mar 2 18:27:59 2026 +0800

    Spark 4.0: Display write metrics on SQL UI (#15468)
    
    Back-port of https://github.com/apache/iceberg/pull/15104
---
 .../org/apache/iceberg/spark/SparkWriteUtil.java   | 109 ++++++++++
 .../spark/source/SparkPositionDeletesRewrite.java  |  21 ++
 .../spark/source/SparkPositionDeltaWrite.java      |  21 ++
 .../apache/iceberg/spark/source/SparkWrite.java    |  21 ++
 .../{SkippedDataFiles.java => AddedDataFiles.java} |   6 +-
 ...SkippedDataFiles.java => AddedDeleteFiles.java} |   6 +-
 ...eteFiles.java => AddedEqualityDeleteFiles.java} |   6 +-
 ...yDeleteFiles.java => AddedEqualityDeletes.java} |   6 +-
 ...yDeleteFiles.java => AddedFileSizeInBytes.java} |   6 +-
 ...eFiles.java => AddedPositionalDeleteFiles.java} |   6 +-
 ...eleteFiles.java => AddedPositionalDeletes.java} |   6 +-
 .../{TotalDataManifests.java => AddedRecords.java} |   6 +-
 .../spark/source/metrics/EqualityDeleteFiles.java  |   2 +-
 .../spark/source/metrics/IndexedDeleteFiles.java   |   2 +-
 .../source/metrics/PositionalDeleteFiles.java      |   2 +-
 ...SkippedDataFiles.java => RemovedDataFiles.java} |   6 +-
 ...ityDeleteFiles.java => RemovedDeleteFiles.java} |   6 +-
 ...eFiles.java => RemovedEqualityDeleteFiles.java} |   6 +-
 ...eleteFiles.java => RemovedEqualityDeletes.java} |   6 +-
 ...eleteFiles.java => RemovedFileSizeInBytes.java} |   6 +-
 ...iles.java => RemovedPositionalDeleteFiles.java} |   6 +-
 ...eteFiles.java => RemovedPositionalDeletes.java} |   6 +-
 .../{SkippedDataFiles.java => RemovedRecords.java} |   6 +-
 .../spark/source/metrics/ResultDataFiles.java      |   2 +-
 .../spark/source/metrics/ResultDeleteFiles.java    |   2 +-
 .../spark/source/metrics/ScannedDataManifests.java |   2 +-
 .../source/metrics/ScannedDeleteManifests.java     |   2 +-
 .../spark/source/metrics/SkippedDataFiles.java     |   2 +-
 .../spark/source/metrics/SkippedDataManifests.java |   2 +-
 .../spark/source/metrics/SkippedDeleteFiles.java   |   2 +-
 .../source/metrics/SkippedDeleteManifests.java     |   2 +-
 .../spark/source/metrics/TotalDataFileSize.java    |   2 +-
 .../{SkippedDataFiles.java => TotalDataFiles.java} |   6 +-
 .../spark/source/metrics/TotalDataManifests.java   |   2 +-
 .../spark/source/metrics/TotalDeleteFileSize.java  |   2 +-
 ...SkippedDataFiles.java => TotalDeleteFiles.java} |   6 +-
 .../spark/source/metrics/TotalDeleteManifests.java |   2 +-
 ...yDeleteFiles.java => TotalEqualityDeletes.java} |   6 +-
 ...DataFileSize.java => TotalFileSizeInBytes.java} |   4 +-
 .../source/metrics/TotalPlanningDuration.java      |   2 +-
 ...eleteFiles.java => TotalPositionalDeletes.java} |   6 +-
 .../{TotalDataManifests.java => TotalRecords.java} |   6 +-
 .../spark/source/TestSparkWriteMetrics.java        | 223 +++++++++++++++++++++
 43 files changed, 476 insertions(+), 81 deletions(-)

diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
index 0d68a0d8cd..86902c15e1 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java
@@ -23,10 +23,38 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MER
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
 
 import java.util.Arrays;
+import java.util.List;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.spark.source.metrics.AddedDataFiles;
+import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedRecords;
+import org.apache.iceberg.spark.source.metrics.RemovedDataFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedRecords;
+import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
+import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalRecords;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SortOrderUtil;
 import org.apache.spark.sql.connector.distributions.Distribution;
@@ -36,6 +64,8 @@ import org.apache.spark.sql.connector.expressions.Expressions;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.SortDirection;
 import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
 
 /**
@@ -256,4 +286,83 @@ public class SparkWriteUtil {
   private static SortOrder sort(Expression expr) {
     return Expressions.sort(expr, SortDirection.ASCENDING);
   }
+
+  public static CustomMetric[] supportedCustomMetrics() {
+    return new CustomMetric[] {
+      new AddedDataFiles(),
+      new AddedDeleteFiles(),
+      new AddedEqualityDeletes(),
+      new AddedEqualityDeleteFiles(),
+      new AddedFileSizeInBytes(),
+      new AddedPositionalDeletes(),
+      new AddedPositionalDeleteFiles(),
+      new AddedRecords(),
+      new RemovedDataFiles(),
+      new RemovedDeleteFiles(),
+      new RemovedRecords(),
+      new RemovedEqualityDeleteFiles(),
+      new RemovedEqualityDeletes(),
+      new RemovedFileSizeInBytes(),
+      new RemovedPositionalDeleteFiles(),
+      new RemovedPositionalDeletes(),
+      new TotalDataFiles(),
+      new TotalDeleteFiles(),
+      new TotalEqualityDeletes(),
+      new TotalFileSizeInBytes(),
+      new TotalPositionalDeletes(),
+      new TotalRecords()
+    };
+  }
+
+  public static CustomTaskMetric[] customTaskMetrics(InMemoryMetricsReporter 
metricsReporter) {
+    List<CustomTaskMetric> metrics = Lists.newArrayList();
+    if (metricsReporter != null) {
+      CommitReport commitReport = metricsReporter.commitReport();
+      if (commitReport != null) {
+        CommitMetricsResult result = commitReport.commitMetrics();
+        addValue(new AddedDataFiles(), result.addedDataFiles(), metrics);
+        addValue(new AddedDeleteFiles(), result.addedDeleteFiles(), metrics);
+        addValue(new AddedEqualityDeletes(), result.addedEqualityDeletes(), 
metrics);
+        addValue(new AddedEqualityDeleteFiles(), 
result.addedEqualityDeleteFiles(), metrics);
+        addValue(new AddedFileSizeInBytes(), result.addedFilesSizeInBytes(), 
metrics);
+        addValue(new AddedPositionalDeletes(), 
result.addedPositionalDeletes(), metrics);
+        addValue(new AddedPositionalDeleteFiles(), 
result.addedPositionalDeleteFiles(), metrics);
+        addValue(new AddedRecords(), result.addedRecords(), metrics);
+        addValue(new RemovedDataFiles(), result.removedDataFiles(), metrics);
+        addValue(new RemovedDeleteFiles(), result.removedDeleteFiles(), 
metrics);
+        addValue(new RemovedRecords(), result.removedRecords(), metrics);
+        addValue(new RemovedEqualityDeleteFiles(), 
result.removedEqualityDeleteFiles(), metrics);
+        addValue(new RemovedEqualityDeletes(), 
result.removedEqualityDeletes(), metrics);
+        addValue(new RemovedFileSizeInBytes(), 
result.removedFilesSizeInBytes(), metrics);
+        addValue(
+            new RemovedPositionalDeleteFiles(), 
result.removedPositionalDeleteFiles(), metrics);
+        addValue(new RemovedPositionalDeletes(), 
result.removedPositionalDeletes(), metrics);
+        addValue(new TotalDataFiles(), result.totalDataFiles(), metrics);
+        addValue(new TotalDeleteFiles(), result.totalDeleteFiles(), metrics);
+        addValue(new TotalEqualityDeletes(), result.totalEqualityDeletes(), 
metrics);
+        addValue(new TotalFileSizeInBytes(), result.totalFilesSizeInBytes(), 
metrics);
+        addValue(new TotalPositionalDeletes(), 
result.totalPositionalDeletes(), metrics);
+        addValue(new TotalRecords(), result.totalRecords(), metrics);
+      }
+    }
+    return metrics.toArray(new CustomTaskMetric[0]);
+  }
+
+  private static void addValue(
+      CustomMetric metric, CounterResult result, List<CustomTaskMetric> 
taskMetrics) {
+    if (result != null) {
+      taskMetrics.add(
+          new CustomTaskMetric() {
+            @Override
+            public String name() {
+              return metric.name();
+            }
+
+            @Override
+            public long value() {
+              return result.value();
+            }
+          });
+    }
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index 0ec7084bfd..3bbea7b5c9 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
@@ -37,16 +38,20 @@ import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitioningDVWriter;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
 import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.spark.SparkWriteUtil;
 import org.apache.iceberg.util.DeleteFileSet;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.write.BatchWrite;
 import org.apache.spark.sql.connector.write.DataWriter;
 import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -80,6 +85,7 @@ public class SparkPositionDeletesRewrite implements Write {
   private final int specId;
   private final StructLike partition;
   private final Map<String, String> writeProperties;
+  private InMemoryMetricsReporter metricsReporter;
 
   /**
    * Constructs a {@link SparkPositionDeletesRewrite}.
@@ -114,6 +120,11 @@ public class SparkPositionDeletesRewrite implements Write {
     this.specId = specId;
     this.partition = partition;
     this.writeProperties = writeConf.writeProperties();
+
+    if (this.table instanceof BaseTable) {
+      this.metricsReporter = new InMemoryMetricsReporter();
+      ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+    }
   }
 
   @Override
@@ -121,6 +132,16 @@ public class SparkPositionDeletesRewrite implements Write {
     return new PositionDeleteBatchWrite();
   }
 
+  @Override
+  public CustomTaskMetric[] reportDriverMetrics() {
+    return SparkWriteUtil.customTaskMetrics(metricsReporter);
+  }
+
+  @Override
+  public CustomMetric[] supportedCustomMetrics() {
+    return SparkWriteUtil.supportedCustomMetrics();
+  }
+
   /** {@link BatchWrite} class for rewriting position deletes files from Spark 
*/
   class PositionDeleteBatchWrite implements BatchWrite {
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index d072397dc6..a1cb31bd37 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -66,12 +67,14 @@ import org.apache.iceberg.io.PartitioningDVWriter;
 import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.io.PositionDeltaWriter;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.SparkWriteUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.DeleteFileSet;
@@ -83,6 +86,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.connector.distributions.Distribution;
 import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.write.DeltaBatchWrite;
 import org.apache.spark.sql.connector.write.DeltaWrite;
 import org.apache.spark.sql.connector.write.DeltaWriter;
@@ -116,6 +121,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
   private final Map<String, String> writeProperties;
 
   private boolean cleanupOnAbort = false;
+  private InMemoryMetricsReporter metricsReporter;
 
   SparkPositionDeltaWrite(
       SparkSession spark,
@@ -139,6 +145,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     this.writeRequirements = writeConf.positionDeltaRequirements(command);
     this.context = new Context(dataSchema, writeConf, info, writeRequirements);
     this.writeProperties = writeConf.writeProperties();
+
+    if (this.table instanceof BaseTable) {
+      this.metricsReporter = new InMemoryMetricsReporter();
+      ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+    }
   }
 
   @Override
@@ -172,6 +183,16 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     return new PositionDeltaBatchWrite();
   }
 
+  @Override
+  public CustomMetric[] supportedCustomMetrics() {
+    return SparkWriteUtil.supportedCustomMetrics();
+  }
+
+  @Override
+  public CustomTaskMetric[] reportDriverMetrics() {
+    return SparkWriteUtil.customTaskMetrics(metricsReporter);
+  }
+
   private class PositionDeltaBatchWrite implements DeltaBatchWrite {
 
     @Override
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index c9a94090ef..5f81689f41 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
@@ -53,12 +54,14 @@ import org.apache.iceberg.io.FileWriter;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.io.RollingDataWriter;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.SparkWriteUtil;
 import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.DataFileSet;
 import org.apache.iceberg.util.DeleteFileSet;
@@ -72,6 +75,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.connector.distributions.Distribution;
 import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.write.BatchWrite;
 import org.apache.spark.sql.connector.write.DataWriter;
 import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -108,6 +113,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private final Map<String, String> writeProperties;
 
   private boolean cleanupOnAbort = false;
+  private InMemoryMetricsReporter metricsReporter;
 
   SparkWrite(
       SparkSession spark,
@@ -135,6 +141,11 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     this.writeRequirements = writeRequirements;
     this.outputSpecId = writeConf.outputSpecId();
     this.writeProperties = writeConf.writeProperties();
+
+    if (this.table instanceof BaseTable) {
+      this.metricsReporter = new InMemoryMetricsReporter();
+      ((BaseTable) this.table).combineMetricsReporter(metricsReporter);
+    }
   }
 
   @Override
@@ -163,6 +174,11 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     return size;
   }
 
+  @Override
+  public CustomMetric[] supportedCustomMetrics() {
+    return SparkWriteUtil.supportedCustomMetrics();
+  }
+
   BatchWrite asBatchAppend() {
     return new BatchAppend();
   }
@@ -265,6 +281,11 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     return files;
   }
 
+  @Override
+  public CustomTaskMetric[] reportDriverMetrics() {
+    return SparkWriteUtil.customTaskMetrics(metricsReporter);
+  }
+
   @Override
   public String toString() {
     return String.format("IcebergWrite(table=%s, format=%s)", table, format);
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
similarity index 87%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
index 7fd1742531..70df1ca2ca 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class AddedDataFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "addedDataFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "number of added data files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
similarity index 86%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
index 7fd1742531..381e912407 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class AddedDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "addedDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "number of added delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
index 754145f7d2..bd653803fa 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedEqualityDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "addedEqualityDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of added equality delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
index 754145f7d2..c5ee8c952d 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedEqualityDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "addedEqualityDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of added equality deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
similarity index 86%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
index 754145f7d2..8e5a16dbf2 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedFileSizeInBytes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "addedFileSizeInBytes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "total size of added files (bytes)";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
similarity index 84%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
index 754145f7d2..d9d5501a80 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedPositionalDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "addedPositionalDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of added positional delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
index 754145f7d2..bd8995eac6 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class AddedPositionalDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "addedPositionalDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of added positional deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
similarity index 87%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
index de8f04be77..240ec5b34b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/AddedRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class TotalDataManifests extends CustomSumMetric {
+public class AddedRecords extends CustomSumMetric {
 
-  static final String NAME = "totalDataManifest";
+  public static final String NAME = "addedRecords";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class TotalDataManifests extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "total data manifests";
+    return "number of added records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
index 754145f7d2..4c258c01c9 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class EqualityDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "equalityDeleteFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
index 7fc5b9066c..93dd410b2a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/IndexedDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class IndexedDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "indexedDeleteFiles";
+  public static final String NAME = "indexedDeleteFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
index 5de75776ea..f362d33535 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/PositionalDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class PositionalDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "positionalDeleteFiles";
+  public static final String NAME = "positionalDeleteFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
similarity index 86%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
index 7fd1742531..96e21e96c2 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class RemovedDataFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "removedDataFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "number of removed data files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
similarity index 86%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
index 754145f7d2..9e1267592f 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of removed delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
similarity index 84%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
index 754145f7d2..07c0de3325 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedEqualityDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedEqualityDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of removed equality delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
index 754145f7d2..2ad7f5a344 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedEqualityDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedEqualityDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of removed equality deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
index 754145f7d2..b7ec8b5195 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedFileSizeInBytes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedFileSizeInBytes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "total size of removed files (bytes)";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
similarity index 84%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
index 754145f7d2..d01529753b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedPositionalDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedPositionalDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of removed positional delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
similarity index 84%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
index 754145f7d2..b99892b211 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class RemovedPositionalDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "removedPositionalDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "number of removed positional deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
similarity index 87%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
index 7fd1742531..b186aff8e0 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/RemovedRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class RemovedRecords extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "removedRecords";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "number of removed records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
index 21959cbf6c..af75b746e4 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDataFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class ResultDataFiles extends CustomSumMetric {
 
-  static final String NAME = "resultDataFiles";
+  public static final String NAME = "resultDataFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
index 9c6ad2ca32..54d7afac81 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ResultDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class ResultDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "resultDeleteFiles";
+  public static final String NAME = "resultDeleteFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
index a167904280..f52efec55b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class ScannedDataManifests extends CustomSumMetric {
 
-  static final String NAME = "scannedDataManifests";
+  public static final String NAME = "scannedDataManifests";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
index 1fa006b7b1..8f11eac2f2 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class ScannedDeleteManifests extends CustomSumMetric {
 
-  static final String NAME = "scannedDeleteManifests";
+  public static final String NAME = "scannedDeleteManifests";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
index 7fd1742531..0e57eb31ea 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class SkippedDataFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "skippedDataFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
index b0eaeb5d87..a02644643b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class SkippedDataManifests extends CustomSumMetric {
 
-  static final String NAME = "skippedDataManifests";
+  public static final String NAME = "skippedDataManifests";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
index 70597be671..517415a519 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteFiles.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class SkippedDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDeleteFiles";
+  public static final String NAME = "skippedDeleteFiles";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
index 0336170b45..c76aa28834 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class SkippedDeleteManifests extends CustomSumMetric {
 
-  static final String NAME = "skippedDeleteManifests";
+  public static final String NAME = "skippedDeleteManifests";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
index b1ff8a4636..2f93dcabb0 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class TotalDataFileSize extends CustomSumMetric {
 
-  static final String NAME = "totalDataFileSize";
+  public static final String NAME = "totalDataFileSize";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
similarity index 87%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
index 7fd1742531..a18126db7a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class TotalDataFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "totalDataFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "total number of data files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
index de8f04be77..33a0656dcf 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class TotalDataManifests extends CustomSumMetric {
 
-  static final String NAME = "totalDataManifest";
+  public static final String NAME = "totalDataManifest";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
index da43033252..d374a23df1 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFileSize.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class TotalDeleteFileSize extends CustomSumMetric {
 
-  static final String NAME = "totalDeleteFileSize";
+  public static final String NAME = "totalDeleteFileSize";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
similarity index 86%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
index 7fd1742531..66e9416253 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteFiles.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class SkippedDataFiles extends CustomSumMetric {
+public class TotalDeleteFiles extends CustomSumMetric {
 
-  static final String NAME = "skippedDataFiles";
+  public static final String NAME = "totalDeleteFiles";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class SkippedDataFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of skipped data files";
+    return "total number of delete files";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
index 7442dfdb6f..58ac739ea3 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDeleteManifests.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class TotalDeleteManifests extends CustomSumMetric {
 
-  static final String NAME = "totalDeleteManifests";
+  public static final String NAME = "totalDeleteManifests";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
index 754145f7d2..fff664cc9b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalEqualityDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class TotalEqualityDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "totalEqualityDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "total number of equality deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
similarity index 89%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
index b1ff8a4636..ec97bfec55 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataFileSize.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSizeInBytes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class TotalDataFileSize extends CustomSumMetric {
+public class TotalFileSizeInBytes extends CustomSumMetric {
 
-  static final String NAME = "totalDataFileSize";
+  public static final String NAME = "totalFileSizeInBytes";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
index 8b66eeac40..f1051bd928 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
@@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
 public class TotalPlanningDuration extends CustomSumMetric {
 
-  static final String NAME = "totalPlanningDuration";
+  public static final String NAME = "totalPlanningDuration";
 
   @Override
   public String name() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
similarity index 85%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
index 754145f7d2..da339688a3 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/EqualityDeleteFiles.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPositionalDeletes.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class EqualityDeleteFiles extends CustomSumMetric {
+public class TotalPositionalDeletes extends CustomSumMetric {
 
-  static final String NAME = "equalityDeleteFiles";
+  public static final String NAME = "totalPositionalDeletes";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class EqualityDeleteFiles extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "number of equality delete files";
+    return "total number of positional deletes records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
similarity index 87%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
index de8f04be77..3cc1123343 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalDataManifests.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalRecords.java
@@ -20,9 +20,9 @@ package org.apache.iceberg.spark.source.metrics;
 
 import org.apache.spark.sql.connector.metric.CustomSumMetric;
 
-public class TotalDataManifests extends CustomSumMetric {
+public class TotalRecords extends CustomSumMetric {
 
-  static final String NAME = "totalDataManifest";
+  public static final String NAME = "totalRecords";
 
   @Override
   public String name() {
@@ -31,6 +31,6 @@ public class TotalDataManifests extends CustomSumMetric {
 
   @Override
   public String description() {
-    return "total data manifests";
+    return "total number of records";
   }
 }
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
new file mode 100644
index 0000000000..661aac1372
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriteMetrics.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.iceberg.spark.source.metrics.AddedDataFiles;
+import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.AddedRecords;
+import org.apache.iceberg.spark.source.metrics.RemovedDataFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.RemovedRecords;
+import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
+import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles;
+import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes;
+import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes;
+import org.apache.iceberg.spark.source.metrics.TotalRecords;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.jdk.javaapi.CollectionConverters;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSparkWriteMetrics extends TestBaseWithCatalog {
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void writeMetrics() {
+    sql("CREATE TABLE %s (id BIGINT) USING iceberg", tableName);
+
+    String insertSql = String.format("INSERT INTO %s SELECT id FROM 
range(1000)", tableName);
+    Dataset<Row> result = spark.sql(insertSql);
+    result.collect();
+
+    SparkPlan plan = result.queryExecution().executedPlan();
+    Map<String, SQLMetric> metricsMap = 
CollectionConverters.asJava(plan.metrics());
+
+    // If we are at the root, check if we have the metrics.
+    // Sometimes the plan structure is complex (e.g. AdaptiveSparkPlanExec).
+    // We might want to find the specific write node.
+
+    if (!metricsMap.containsKey(AddedDataFiles.NAME)) {
+      // Attempt to find a node with these metrics
+      metricsMap = findMetrics(plan, AddedDataFiles.NAME);
+    }
+
+    assertThat(metricsMap).isNotNull();
+    assertThat(metricsMap)
+        .hasEntrySatisfying(AddedDataFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(2));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedRecords.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1000));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedFileSizeInBytes.NAME, metric -> 
assertThat(metric.value()).isGreaterThan(0));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(TotalDataFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(2));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            TotalRecords.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1000));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            TotalFileSizeInBytes.NAME, metric -> 
assertThat(metric.value()).isGreaterThan(0));
+
+    // Verify other metrics are 0
+    String[] zeroMetrics = {
+      AddedDeleteFiles.NAME,
+      AddedEqualityDeleteFiles.NAME,
+      AddedPositionalDeleteFiles.NAME,
+      AddedEqualityDeletes.NAME,
+      AddedPositionalDeletes.NAME,
+      RemovedDataFiles.NAME,
+      RemovedDeleteFiles.NAME,
+      RemovedEqualityDeleteFiles.NAME,
+      RemovedPositionalDeleteFiles.NAME,
+      RemovedEqualityDeletes.NAME,
+      RemovedPositionalDeletes.NAME,
+      RemovedRecords.NAME,
+      RemovedFileSizeInBytes.NAME,
+      TotalDeleteFiles.NAME,
+      TotalEqualityDeletes.NAME,
+      TotalPositionalDeletes.NAME
+    };
+
+    for (String metric : zeroMetrics) {
+      assertThat(metricsMap)
+          .hasEntrySatisfying(metric, m -> 
assertThat(m.value()).as(metric).isEqualTo(0));
+    }
+  }
+
+  @TestTemplate
+  public void deleteMetrics() throws NoSuchTableException {
+    sql(
+        "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES 
('write.delete.mode'='merge-on-read')",
+        tableName);
+
+    spark.range(100).coalesce(1).writeTo(tableName).append();
+
+    String deleteSql = String.format("DELETE FROM %s WHERE id = 1", tableName);
+    Dataset<Row> result = spark.sql(deleteSql);
+    result.collect();
+
+    SparkPlan plan = result.queryExecution().executedPlan();
+
+    Map<String, SQLMetric> metricsMap = findMetrics(plan, 
AddedPositionalDeleteFiles.NAME);
+
+    assertThat(metricsMap).isNotNull();
+
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedPositionalDeleteFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            RemovedDataFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(0));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedPositionalDeletes.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            TotalDeleteFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            TotalPositionalDeletes.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedDeleteFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            AddedFileSizeInBytes.NAME, metric -> 
assertThat(metric.value()).isGreaterThan(0));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(TotalDataFiles.NAME, metric -> 
assertThat(metric.value()).isEqualTo(1));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(TotalRecords.NAME, metric -> 
assertThat(metric.value()).isEqualTo(100));
+    assertThat(metricsMap)
+        .hasEntrySatisfying(
+            TotalFileSizeInBytes.NAME, metric -> 
assertThat(metric.value()).isGreaterThan(0));
+
+    // Verify other metrics are 0
+    String[] zeroMetrics = {
+      AddedDataFiles.NAME,
+      AddedEqualityDeleteFiles.NAME,
+      AddedEqualityDeletes.NAME,
+      AddedRecords.NAME,
+      RemovedDeleteFiles.NAME,
+      RemovedEqualityDeleteFiles.NAME,
+      RemovedPositionalDeleteFiles.NAME,
+      RemovedEqualityDeletes.NAME,
+      RemovedPositionalDeletes.NAME,
+      RemovedRecords.NAME,
+      RemovedFileSizeInBytes.NAME,
+      TotalEqualityDeletes.NAME
+    };
+
+    for (String metric : zeroMetrics) {
+      assertThat(metricsMap)
+          .hasEntrySatisfying(metric, m -> 
assertThat(m.value()).as(metric).isEqualTo(0));
+    }
+  }
+
+  private Map<String, SQLMetric> findMetrics(SparkPlan plan, String 
metricName) {
+    Map<String, SQLMetric> metrics = 
CollectionConverters.asJava(plan.metrics());
+    if (metrics.containsKey(metricName)) {
+      return metrics;
+    }
+
+    for (SparkPlan child : CollectionConverters.asJava(plan.children())) {
+      Map<String, SQLMetric> result = findMetrics(child, metricName);
+      if (result != null) {
+        return result;
+      }
+    }
+
+    for (Object child : CollectionConverters.asJava(plan.innerChildren())) {
+      if (child instanceof SparkPlan) {
+        Map<String, SQLMetric> result = findMetrics((SparkPlan) child, 
metricName);
+        if (result != null) {
+          return result;
+        }
+      }
+    }
+
+    return null;
+  }
+}

Reply via email to