This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e89f6b08e Spark 3.4: Display read metrics on Spark SQL UI (#7447)
8e89f6b08e is described below

commit 8e89f6b08e06510ac9f997cbd8dc22d761447462
Author: Karuppayya <[email protected]>
AuthorDate: Thu Jul 20 16:04:44 2023 -0700

    Spark 3.4: Display read metrics on Spark SQL UI (#7447)
---
 .../src/main/java/org/apache/iceberg/BaseScan.java |  2 +-
 .../iceberg/metrics/InMemoryMetricsReporter.java   | 38 +++++++++
 .../iceberg/spark/source/SparkBatchQueryScan.java  |  8 +-
 .../iceberg/spark/source/SparkCopyOnWriteScan.java | 13 ++--
 .../spark/source/SparkPartitioningAwareScan.java   |  8 +-
 .../org/apache/iceberg/spark/source/SparkScan.java | 51 +++++++++++-
 .../iceberg/spark/source/SparkScanBuilder.java     | 56 ++++++++++++--
 .../iceberg/spark/source/SparkStagedScan.java      |  2 +-
 .../spark/source/metrics/ScannedDataFiles.java     | 36 +++++++++
 .../spark/source/metrics/ScannedDataManifests.java | 36 +++++++++
 .../spark/source/metrics/SkippedDataFiles.java     | 36 +++++++++
 .../spark/source/metrics/SkippedDataManifests.java | 36 +++++++++
 .../spark/source/metrics/TaskScannedDataFiles.java | 47 +++++++++++
 .../source/metrics/TaskScannedDataManifests.java   | 47 +++++++++++
 .../spark/source/metrics/TaskSkippedDataFiles.java | 47 +++++++++++
 .../source/metrics/TaskSkippedDataManifests.java   | 47 +++++++++++
 .../spark/source/metrics/TaskTotalFileSize.java    | 48 ++++++++++++
 .../source/metrics/TaskTotalPlanningDuration.java  | 48 ++++++++++++
 .../spark/source/metrics/TotalFileSize.java        | 36 +++++++++
 .../source/metrics/TotalPlanningDuration.java      | 36 +++++++++
 .../iceberg/spark/source/TestSparkReadMetrics.java | 90 ++++++++++++++++++++++
 21 files changed, 746 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 9db72227ac..953ad754aa 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -254,6 +254,6 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
 
   @Override
   public ThisT metricsReporter(MetricsReporter reporter) {
-    return newRefinedScan(table(), schema(), context().reportWith(reporter));
+    return newRefinedScan(table, schema, context.reportWith(reporter));
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java 
b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java
new file mode 100644
index 0000000000..79b446c0dd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryMetricsReporter implements MetricsReporter {
+
+  private MetricsReport metricsReport;
+
+  @Override
+  public void report(MetricsReport report) {
+    this.metricsReport = report;
+  }
+
+  public ScanReport scanReport() {
+    Preconditions.checkArgument(
+        metricsReport == null || metricsReport instanceof ScanReport,
+        "Metrics report is not a scan report");
+    return (ScanReport) metricsReport;
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index dd493fbc50..036c395f71 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionScanTask;
@@ -39,6 +40,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -73,9 +75,9 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-
-    super(spark, table, scan, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> scanReportSupplier) {
+    super(spark, table, scan, readConf, expectedSchema, filters, 
scanReportSupplier);
 
     this.snapshotId = readConf.snapshotId();
     this.startSnapshotId = readConf.startSnapshotId();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index d978b81e67..16eb9c51df 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.iceberg.BatchScan;
 import org.apache.iceberg.FileScanTask;
@@ -30,6 +31,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkReadConf;
@@ -57,8 +59,9 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
       Table table,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-    this(spark, table, null, null, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> scanReportSupplier) {
+    this(spark, table, null, null, readConf, expectedSchema, filters, 
scanReportSupplier);
   }
 
   SparkCopyOnWriteScan(
@@ -68,9 +71,9 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
       Snapshot snapshot,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-
-    super(spark, table, scan, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> scanReportSupplier) {
+    super(spark, table, scan, readConf, expectedSchema, filters, 
scanReportSupplier);
 
     this.snapshot = snapshot;
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index cf274f794e..6538268697 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.iceberg.PartitionField;
@@ -37,6 +38,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
@@ -74,9 +76,9 @@ abstract class SparkPartitioningAwareScan<T extends 
PartitionScanTask> extends S
       Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
-
-    super(spark, table, readConf, expectedSchema, filters);
+      List<Expression> filters,
+      Supplier<ScanReport> scanReportSupplier) {
+    super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
 
     this.scan = scan;
     this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index b47818ec55..65c5e04f31 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
@@ -28,17 +29,32 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.source.metrics.NumDeletes;
 import org.apache.iceberg.spark.source.metrics.NumSplits;
+import org.apache.iceberg.spark.source.metrics.ScannedDataFiles;
+import org.apache.iceberg.spark.source.metrics.ScannedDataManifests;
+import org.apache.iceberg.spark.source.metrics.SkippedDataFiles;
+import org.apache.iceberg.spark.source.metrics.SkippedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskScannedDataFiles;
+import org.apache.iceberg.spark.source.metrics.TaskScannedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskSkippedDataFiles;
+import org.apache.iceberg.spark.source.metrics.TaskSkippedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskTotalFileSize;
+import org.apache.iceberg.spark.source.metrics.TaskTotalPlanningDuration;
+import org.apache.iceberg.spark.source.metrics.TotalFileSize;
+import org.apache.iceberg.spark.source.metrics.TotalPlanningDuration;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.Statistics;
@@ -58,6 +74,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
   private final Schema expectedSchema;
   private final List<Expression> filterExpressions;
   private final String branch;
+  private final Supplier<ScanReport> scanReportSupplier;
 
   // lazy variables
   private StructType readSchema;
@@ -67,7 +84,8 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
       Table table,
       SparkReadConf readConf,
       Schema expectedSchema,
-      List<Expression> filters) {
+      List<Expression> filters,
+      Supplier<ScanReport> scanReportSupplier) {
     Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
     SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, 
expectedSchema);
 
@@ -78,6 +96,7 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
     this.expectedSchema = expectedSchema;
     this.filterExpressions = filters != null ? filters : 
Collections.emptyList();
     this.branch = readConf.branch();
+    this.scanReportSupplier = scanReportSupplier;
   }
 
   protected Table table() {
@@ -170,8 +189,36 @@ abstract class SparkScan implements Scan, 
SupportsReportStatistics {
         table(), branch(), Spark3Util.describe(filterExpressions), 
groupingKeyFieldNamesAsString);
   }
 
+  @Override
+  public CustomTaskMetric[] reportDriverMetrics() {
+    ScanReport scanReport = scanReportSupplier != null ? 
scanReportSupplier.get() : null;
+
+    if (scanReport == null) {
+      return new CustomTaskMetric[0];
+    }
+
+    List<CustomTaskMetric> driverMetrics = Lists.newArrayList();
+    driverMetrics.add(TaskTotalFileSize.from(scanReport));
+    driverMetrics.add(TaskTotalPlanningDuration.from(scanReport));
+    driverMetrics.add(TaskSkippedDataFiles.from(scanReport));
+    driverMetrics.add(TaskScannedDataFiles.from(scanReport));
+    driverMetrics.add(TaskSkippedDataManifests.from(scanReport));
+    driverMetrics.add(TaskScannedDataManifests.from(scanReport));
+
+    return driverMetrics.toArray(new CustomTaskMetric[0]);
+  }
+
   @Override
   public CustomMetric[] supportedCustomMetrics() {
-    return new CustomMetric[] {new NumSplits(), new NumDeletes()};
+    return new CustomMetric[] {
+      new NumSplits(),
+      new NumDeletes(),
+      new TotalFileSize(),
+      new TotalPlanningDuration(),
+      new ScannedDataManifests(),
+      new SkippedDataManifests(),
+      new ScannedDataFiles(),
+      new SkippedDataFiles()
+    };
   }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index ddeec9c494..d19368cec8 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -91,6 +92,7 @@ public class SparkScanBuilder
   private final CaseInsensitiveStringMap options;
   private final SparkReadConf readConf;
   private final List<String> metaColumns = Lists.newArrayList();
+  private final InMemoryMetricsReporter metricsReporter;
 
   private Schema schema = null;
   private boolean caseSensitive;
@@ -109,6 +111,7 @@ public class SparkScanBuilder
     this.options = options;
     this.readConf = new SparkReadConf(spark, table, branch, options);
     this.caseSensitive = readConf.caseSensitive();
+    this.metricsReporter = new InMemoryMetricsReporter();
   }
 
   SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap 
options) {
@@ -430,7 +433,8 @@ public class SparkScanBuilder
             .newBatchScan()
             .caseSensitive(caseSensitive)
             .filter(filterExpression())
-            .project(expectedSchema);
+            .project(expectedSchema)
+            .metricsReporter(metricsReporter);
 
     if (snapshotId != null) {
       scan = scan.useSnapshot(snapshotId);
@@ -450,7 +454,14 @@ public class SparkScanBuilder
 
     scan = configureSplitPlanning(scan);
 
-    return new SparkBatchQueryScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
+    return new SparkBatchQueryScan(
+        spark,
+        table,
+        scan,
+        readConf,
+        expectedSchema,
+        filterExpressions,
+        metricsReporter::scanReport);
   }
 
   private Scan buildIncrementalAppendScan(long startSnapshotId, Long 
endSnapshotId) {
@@ -470,7 +481,14 @@ public class SparkScanBuilder
 
     scan = configureSplitPlanning(scan);
 
-    return new SparkBatchQueryScan(spark, table, scan, readConf, 
expectedSchema, filterExpressions);
+    return new SparkBatchQueryScan(
+        spark,
+        table,
+        scan,
+        readConf,
+        expectedSchema,
+        filterExpressions,
+        metricsReporter::scanReport);
   }
 
   public Scan buildChangelogScan() {
@@ -573,7 +591,13 @@ public class SparkScanBuilder
 
     if (snapshot == null) {
       return new SparkBatchQueryScan(
-          spark, table, null, readConf, schemaWithMetadataColumns(), 
filterExpressions);
+          spark,
+          table,
+          null,
+          readConf,
+          schemaWithMetadataColumns(),
+          filterExpressions,
+          metricsReporter::scanReport);
     }
 
     // remember the current snapshot ID for commit validation
@@ -597,7 +621,13 @@ public class SparkScanBuilder
     scan = configureSplitPlanning(scan);
 
     return new SparkBatchQueryScan(
-        spark, table, scan, adjustedReadConf, expectedSchema, 
filterExpressions);
+        spark,
+        table,
+        scan,
+        adjustedReadConf,
+        expectedSchema,
+        filterExpressions,
+        metricsReporter::scanReport);
   }
 
   public Scan buildCopyOnWriteScan() {
@@ -605,7 +635,12 @@ public class SparkScanBuilder
 
     if (snapshot == null) {
       return new SparkCopyOnWriteScan(
-          spark, table, readConf, schemaWithMetadataColumns(), 
filterExpressions);
+          spark,
+          table,
+          readConf,
+          schemaWithMetadataColumns(),
+          filterExpressions,
+          metricsReporter::scanReport);
     }
 
     Schema expectedSchema = schemaWithMetadataColumns();
@@ -622,7 +657,14 @@ public class SparkScanBuilder
     scan = configureSplitPlanning(scan);
 
     return new SparkCopyOnWriteScan(
-        spark, table, scan, snapshot, readConf, expectedSchema, 
filterExpressions);
+        spark,
+        table,
+        scan,
+        snapshot,
+        readConf,
+        expectedSchema,
+        filterExpressions,
+        metricsReporter::scanReport);
   }
 
   private <T extends org.apache.iceberg.Scan<T, ?, ?>> T 
configureSplitPlanning(T scan) {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index 89b184c91c..0290bf7e84 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -40,7 +40,7 @@ class SparkStagedScan extends SparkScan {
   private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of 
tasks
 
   SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) {
-    super(spark, table, readConf, table.schema(), ImmutableList.of());
+    super(spark, table, readConf, table.schema(), ImmutableList.of(), null);
 
     this.taskSetId = readConf.scanTaskSetId();
     this.splitSize = readConf.splitSize();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
new file mode 100644
index 0000000000..f453872fdc
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class ScannedDataFiles extends CustomSumMetric {
+
+  static final String NAME = "scannedDataFiles";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "number of scanned data files";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
new file mode 100644
index 0000000000..a167904280
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class ScannedDataManifests extends CustomSumMetric {
+
+  static final String NAME = "scannedDataManifests";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "number of scanned data manifests";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
new file mode 100644
index 0000000000..7fd1742531
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class SkippedDataFiles extends CustomSumMetric {
+
+  static final String NAME = "skippedDataFiles";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "number of skipped data files";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
new file mode 100644
index 0000000000..b0eaeb5d87
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class SkippedDataManifests extends CustomSumMetric {
+
+  static final String NAME = "skippedDataManifests";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "number of skipped data manifests";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
new file mode 100644
index 0000000000..d9a527da08
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskScannedDataFiles implements CustomTaskMetric {
+  private final long value;
+
+  private TaskScannedDataFiles(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return ScannedDataFiles.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskScannedDataFiles from(ScanReport scanReport) {
+    CounterResult counter = scanReport.scanMetrics().resultDataFiles();
+    long value = counter != null ? counter.value() : 0L;
+    return new TaskScannedDataFiles(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
new file mode 100644
index 0000000000..09dd033991
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskScannedDataManifests implements CustomTaskMetric {
+  private final long value;
+
+  private TaskScannedDataManifests(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return ScannedDataManifests.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskScannedDataManifests from(ScanReport scanReport) {
+    CounterResult counter = scanReport.scanMetrics().scannedDataManifests();
+    long value = counter != null ? counter.value() : 0L;
+    return new TaskScannedDataManifests(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
new file mode 100644
index 0000000000..5165f9a311
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskSkippedDataFiles implements CustomTaskMetric {
+  private final long value;
+
+  private TaskSkippedDataFiles(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return SkippedDataFiles.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskSkippedDataFiles from(ScanReport scanReport) {
+    CounterResult counter = scanReport.scanMetrics().skippedDataFiles();
+    long value = counter != null ? counter.value() : 0L;
+    return new TaskSkippedDataFiles(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
new file mode 100644
index 0000000000..86fef8c411
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskSkippedDataManifests implements CustomTaskMetric {
+  private final long value;
+
+  private TaskSkippedDataManifests(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return SkippedDataManifests.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskSkippedDataManifests from(ScanReport scanReport) {
+    CounterResult counter = scanReport.scanMetrics().skippedDataManifests();
+    long value = counter != null ? counter.value() : 0L;
+    return new TaskSkippedDataManifests(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
new file mode 100644
index 0000000000..c300d835e7
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskTotalFileSize implements CustomTaskMetric {
+
+  private final long value;
+
+  private TaskTotalFileSize(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return TotalFileSize.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskTotalFileSize from(ScanReport scanReport) {
+    CounterResult counter = scanReport.scanMetrics().totalFileSizeInBytes();
+    long value = counter != null ? counter.value() : 0L;
+    return new TaskTotalFileSize(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
new file mode 100644
index 0000000000..32ac6fde8b
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.TimerResult;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskTotalPlanningDuration implements CustomTaskMetric {
+
+  private final long value;
+
+  private TaskTotalPlanningDuration(long value) {
+    this.value = value;
+  }
+
+  @Override
+  public String name() {
+    return TotalPlanningDuration.NAME;
+  }
+
+  @Override
+  public long value() {
+    return value;
+  }
+
+  public static TaskTotalPlanningDuration from(ScanReport scanReport) {
+    TimerResult timerResult = scanReport.scanMetrics().totalPlanningDuration();
+    long value = timerResult != null ? timerResult.totalDuration().toMillis() 
: -1;
+    return new TaskTotalPlanningDuration(value);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
new file mode 100644
index 0000000000..994626e54f
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class TotalFileSize extends CustomSumMetric {
+
+  static final String NAME = "totalFileSize";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "total file size (bytes)";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
new file mode 100644
index 0000000000..8b66eeac40
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class TotalPlanningDuration extends CustomSumMetric {
+
+  static final String NAME = "totalPlanningDuration";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "total planning duration (ms)";
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
new file mode 100644
index 0000000000..7b943372d1
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static scala.collection.JavaConverters.seqAsJavaListConverter;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Test;
+import scala.collection.JavaConverters;
+
+public class TestSparkReadMetrics extends SparkTestBaseWithCatalog {
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testReadMetricsForV1Table() throws NoSuchTableException {
+    sql(
+        "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES 
('format-version'='2')",
+        tableName);
+
+    spark.range(10000).coalesce(1).writeTo(tableName).append();
+    spark.range(10001, 20000).coalesce(1).writeTo(tableName).append();
+
+    Dataset<Row> df = spark.sql(String.format("select * from %s where id < 
10000", tableName));
+    df.collect();
+
+    List<SparkPlan> sparkPlans =
+        
seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava();
+    Map<String, SQLMetric> metricsMap =
+        
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
+    
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
+    
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
+    
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
+    
Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0);
+    
Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0);
+    
Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0);
+  }
+
+  @Test
+  public void testReadMetricsForV2Table() throws NoSuchTableException {
+    sql(
+        "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES 
('format-version'='2')",
+        tableName);
+
+    spark.range(10000).coalesce(1).writeTo(tableName).append();
+    spark.range(10001, 20000).coalesce(1).writeTo(tableName).append();
+
+    Dataset<Row> df = spark.sql(String.format("select * from %s where id < 
10000", tableName));
+    df.collect();
+
+    List<SparkPlan> sparkPlans =
+        
seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava();
+    Map<String, SQLMetric> metricsMap =
+        
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
+    
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
+    
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
+    
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
+    
Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0);
+    
Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0);
+    
Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0);
+  }
+}


Reply via email to