This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8e89f6b08e Spark 3.4: Display read metrics on Spark SQL UI (#7447)
8e89f6b08e is described below
commit 8e89f6b08e06510ac9f997cbd8dc22d761447462
Author: Karuppayya <[email protected]>
AuthorDate: Thu Jul 20 16:04:44 2023 -0700
Spark 3.4: Display read metrics on Spark SQL UI (#7447)
---
.../src/main/java/org/apache/iceberg/BaseScan.java | 2 +-
.../iceberg/metrics/InMemoryMetricsReporter.java | 38 +++++++++
.../iceberg/spark/source/SparkBatchQueryScan.java | 8 +-
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 13 ++--
.../spark/source/SparkPartitioningAwareScan.java | 8 +-
.../org/apache/iceberg/spark/source/SparkScan.java | 51 +++++++++++-
.../iceberg/spark/source/SparkScanBuilder.java | 56 ++++++++++++--
.../iceberg/spark/source/SparkStagedScan.java | 2 +-
.../spark/source/metrics/ScannedDataFiles.java | 36 +++++++++
.../spark/source/metrics/ScannedDataManifests.java | 36 +++++++++
.../spark/source/metrics/SkippedDataFiles.java | 36 +++++++++
.../spark/source/metrics/SkippedDataManifests.java | 36 +++++++++
.../spark/source/metrics/TaskScannedDataFiles.java | 47 +++++++++++
.../source/metrics/TaskScannedDataManifests.java | 47 +++++++++++
.../spark/source/metrics/TaskSkippedDataFiles.java | 47 +++++++++++
.../source/metrics/TaskSkippedDataManifests.java | 47 +++++++++++
.../spark/source/metrics/TaskTotalFileSize.java | 48 ++++++++++++
.../source/metrics/TaskTotalPlanningDuration.java | 48 ++++++++++++
.../spark/source/metrics/TotalFileSize.java | 36 +++++++++
.../source/metrics/TotalPlanningDuration.java | 36 +++++++++
.../iceberg/spark/source/TestSparkReadMetrics.java | 90 ++++++++++++++++++++++
21 files changed, 746 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 9db72227ac..953ad754aa 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -254,6 +254,6 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
@Override
public ThisT metricsReporter(MetricsReporter reporter) {
- return newRefinedScan(table(), schema(), context().reportWith(reporter));
+ return newRefinedScan(table, schema, context.reportWith(reporter));
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java
b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java
new file mode 100644
index 0000000000..79b446c0dd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class InMemoryMetricsReporter implements MetricsReporter {
+
+ private MetricsReport metricsReport;
+
+ @Override
+ public void report(MetricsReport report) {
+ this.metricsReport = report;
+ }
+
+ public ScanReport scanReport() {
+ Preconditions.checkArgument(
+ metricsReport == null || metricsReport instanceof ScanReport,
+ "Metrics report is not a scan report");
+ return (ScanReport) metricsReport;
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index dd493fbc50..036c395f71 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
@@ -39,6 +40,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -73,9 +75,9 @@ class SparkBatchQueryScan extends
SparkPartitioningAwareScan<PartitionScanTask>
Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
-
- super(spark, table, scan, readConf, expectedSchema, filters);
+ List<Expression> filters,
+ Supplier<ScanReport> scanReportSupplier) {
+ super(spark, table, scan, readConf, expectedSchema, filters,
scanReportSupplier);
this.snapshotId = readConf.snapshotId();
this.startSnapshotId = readConf.startSnapshotId();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index d978b81e67..16eb9c51df 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.FileScanTask;
@@ -30,6 +31,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
@@ -57,8 +59,9 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
Table table,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
- this(spark, table, null, null, readConf, expectedSchema, filters);
+ List<Expression> filters,
+ Supplier<ScanReport> scanReportSupplier) {
+ this(spark, table, null, null, readConf, expectedSchema, filters,
scanReportSupplier);
}
SparkCopyOnWriteScan(
@@ -68,9 +71,9 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
Snapshot snapshot,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
-
- super(spark, table, scan, readConf, expectedSchema, filters);
+ List<Expression> filters,
+ Supplier<ScanReport> scanReportSupplier) {
+ super(spark, table, scan, readConf, expectedSchema, filters,
scanReportSupplier);
this.snapshot = snapshot;
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index cf274f794e..6538268697 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.PartitionField;
@@ -37,6 +38,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
@@ -74,9 +76,9 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
Scan<?, ? extends ScanTask, ? extends ScanTaskGroup<?>> scan,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
-
- super(spark, table, readConf, expectedSchema, filters);
+ List<Expression> filters,
+ Supplier<ScanReport> scanReportSupplier) {
+ super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
this.scan = scan;
this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index b47818ec55..65c5e04f31 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
@@ -28,17 +29,32 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.metrics.NumDeletes;
import org.apache.iceberg.spark.source.metrics.NumSplits;
+import org.apache.iceberg.spark.source.metrics.ScannedDataFiles;
+import org.apache.iceberg.spark.source.metrics.ScannedDataManifests;
+import org.apache.iceberg.spark.source.metrics.SkippedDataFiles;
+import org.apache.iceberg.spark.source.metrics.SkippedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskScannedDataFiles;
+import org.apache.iceberg.spark.source.metrics.TaskScannedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskSkippedDataFiles;
+import org.apache.iceberg.spark.source.metrics.TaskSkippedDataManifests;
+import org.apache.iceberg.spark.source.metrics.TaskTotalFileSize;
+import org.apache.iceberg.spark.source.metrics.TaskTotalPlanningDuration;
+import org.apache.iceberg.spark.source.metrics.TotalFileSize;
+import org.apache.iceberg.spark.source.metrics.TotalPlanningDuration;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.metric.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
@@ -58,6 +74,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
private final Schema expectedSchema;
private final List<Expression> filterExpressions;
private final String branch;
+ private final Supplier<ScanReport> scanReportSupplier;
// lazy variables
private StructType readSchema;
@@ -67,7 +84,8 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
Table table,
SparkReadConf readConf,
Schema expectedSchema,
- List<Expression> filters) {
+ List<Expression> filters,
+ Supplier<ScanReport> scanReportSupplier) {
Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema,
expectedSchema);
@@ -78,6 +96,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters :
Collections.emptyList();
this.branch = readConf.branch();
+ this.scanReportSupplier = scanReportSupplier;
}
protected Table table() {
@@ -170,8 +189,36 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
table(), branch(), Spark3Util.describe(filterExpressions),
groupingKeyFieldNamesAsString);
}
+ @Override
+ public CustomTaskMetric[] reportDriverMetrics() {
+ ScanReport scanReport = scanReportSupplier != null ?
scanReportSupplier.get() : null;
+
+ if (scanReport == null) {
+ return new CustomTaskMetric[0];
+ }
+
+ List<CustomTaskMetric> driverMetrics = Lists.newArrayList();
+ driverMetrics.add(TaskTotalFileSize.from(scanReport));
+ driverMetrics.add(TaskTotalPlanningDuration.from(scanReport));
+ driverMetrics.add(TaskSkippedDataFiles.from(scanReport));
+ driverMetrics.add(TaskScannedDataFiles.from(scanReport));
+ driverMetrics.add(TaskSkippedDataManifests.from(scanReport));
+ driverMetrics.add(TaskScannedDataManifests.from(scanReport));
+
+ return driverMetrics.toArray(new CustomTaskMetric[0]);
+ }
+
@Override
public CustomMetric[] supportedCustomMetrics() {
- return new CustomMetric[] {new NumSplits(), new NumDeletes()};
+ return new CustomMetric[] {
+ new NumSplits(),
+ new NumDeletes(),
+ new TotalFileSize(),
+ new TotalPlanningDuration(),
+ new ScannedDataManifests(),
+ new SkippedDataManifests(),
+ new ScannedDataFiles(),
+ new SkippedDataFiles()
+ };
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index ddeec9c494..d19368cec8 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -91,6 +92,7 @@ public class SparkScanBuilder
private final CaseInsensitiveStringMap options;
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();
+ private final InMemoryMetricsReporter metricsReporter;
private Schema schema = null;
private boolean caseSensitive;
@@ -109,6 +111,7 @@ public class SparkScanBuilder
this.options = options;
this.readConf = new SparkReadConf(spark, table, branch, options);
this.caseSensitive = readConf.caseSensitive();
+ this.metricsReporter = new InMemoryMetricsReporter();
}
SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap
options) {
@@ -430,7 +433,8 @@ public class SparkScanBuilder
.newBatchScan()
.caseSensitive(caseSensitive)
.filter(filterExpression())
- .project(expectedSchema);
+ .project(expectedSchema)
+ .metricsReporter(metricsReporter);
if (snapshotId != null) {
scan = scan.useSnapshot(snapshotId);
@@ -450,7 +454,14 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
- return new SparkBatchQueryScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
+ return new SparkBatchQueryScan(
+ spark,
+ table,
+ scan,
+ readConf,
+ expectedSchema,
+ filterExpressions,
+ metricsReporter::scanReport);
}
private Scan buildIncrementalAppendScan(long startSnapshotId, Long
endSnapshotId) {
@@ -470,7 +481,14 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
- return new SparkBatchQueryScan(spark, table, scan, readConf,
expectedSchema, filterExpressions);
+ return new SparkBatchQueryScan(
+ spark,
+ table,
+ scan,
+ readConf,
+ expectedSchema,
+ filterExpressions,
+ metricsReporter::scanReport);
}
public Scan buildChangelogScan() {
@@ -573,7 +591,13 @@ public class SparkScanBuilder
if (snapshot == null) {
return new SparkBatchQueryScan(
- spark, table, null, readConf, schemaWithMetadataColumns(),
filterExpressions);
+ spark,
+ table,
+ null,
+ readConf,
+ schemaWithMetadataColumns(),
+ filterExpressions,
+ metricsReporter::scanReport);
}
// remember the current snapshot ID for commit validation
@@ -597,7 +621,13 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
return new SparkBatchQueryScan(
- spark, table, scan, adjustedReadConf, expectedSchema,
filterExpressions);
+ spark,
+ table,
+ scan,
+ adjustedReadConf,
+ expectedSchema,
+ filterExpressions,
+ metricsReporter::scanReport);
}
public Scan buildCopyOnWriteScan() {
@@ -605,7 +635,12 @@ public class SparkScanBuilder
if (snapshot == null) {
return new SparkCopyOnWriteScan(
- spark, table, readConf, schemaWithMetadataColumns(),
filterExpressions);
+ spark,
+ table,
+ readConf,
+ schemaWithMetadataColumns(),
+ filterExpressions,
+ metricsReporter::scanReport);
}
Schema expectedSchema = schemaWithMetadataColumns();
@@ -622,7 +657,14 @@ public class SparkScanBuilder
scan = configureSplitPlanning(scan);
return new SparkCopyOnWriteScan(
- spark, table, scan, snapshot, readConf, expectedSchema,
filterExpressions);
+ spark,
+ table,
+ scan,
+ snapshot,
+ readConf,
+ expectedSchema,
+ filterExpressions,
+ metricsReporter::scanReport);
}
private <T extends org.apache.iceberg.Scan<T, ?, ?>> T
configureSplitPlanning(T scan) {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index 89b184c91c..0290bf7e84 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -40,7 +40,7 @@ class SparkStagedScan extends SparkScan {
private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of
tasks
SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) {
- super(spark, table, readConf, table.schema(), ImmutableList.of());
+ super(spark, table, readConf, table.schema(), ImmutableList.of(), null);
this.taskSetId = readConf.scanTaskSetId();
this.splitSize = readConf.splitSize();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
new file mode 100644
index 0000000000..f453872fdc
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class ScannedDataFiles extends CustomSumMetric {
+
+ static final String NAME = "scannedDataFiles";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "number of scanned data files";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
new file mode 100644
index 0000000000..a167904280
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class ScannedDataManifests extends CustomSumMetric {
+
+ static final String NAME = "scannedDataManifests";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "number of scanned data manifests";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
new file mode 100644
index 0000000000..7fd1742531
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class SkippedDataFiles extends CustomSumMetric {
+
+ static final String NAME = "skippedDataFiles";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "number of skipped data files";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
new file mode 100644
index 0000000000..b0eaeb5d87
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class SkippedDataManifests extends CustomSumMetric {
+
+ static final String NAME = "skippedDataManifests";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "number of skipped data manifests";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
new file mode 100644
index 0000000000..d9a527da08
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskScannedDataFiles implements CustomTaskMetric {
+ private final long value;
+
+ private TaskScannedDataFiles(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return ScannedDataFiles.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskScannedDataFiles from(ScanReport scanReport) {
+ CounterResult counter = scanReport.scanMetrics().resultDataFiles();
+ long value = counter != null ? counter.value() : 0L;
+ return new TaskScannedDataFiles(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
new file mode 100644
index 0000000000..09dd033991
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskScannedDataManifests implements CustomTaskMetric {
+ private final long value;
+
+ private TaskScannedDataManifests(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return ScannedDataManifests.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskScannedDataManifests from(ScanReport scanReport) {
+ CounterResult counter = scanReport.scanMetrics().scannedDataManifests();
+ long value = counter != null ? counter.value() : 0L;
+ return new TaskScannedDataManifests(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
new file mode 100644
index 0000000000..5165f9a311
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskSkippedDataFiles implements CustomTaskMetric {
+ private final long value;
+
+ private TaskSkippedDataFiles(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return SkippedDataFiles.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskSkippedDataFiles from(ScanReport scanReport) {
+ CounterResult counter = scanReport.scanMetrics().skippedDataFiles();
+ long value = counter != null ? counter.value() : 0L;
+ return new TaskSkippedDataFiles(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
new file mode 100644
index 0000000000..86fef8c411
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskSkippedDataManifests implements CustomTaskMetric {
+ private final long value;
+
+ private TaskSkippedDataManifests(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return SkippedDataManifests.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskSkippedDataManifests from(ScanReport scanReport) {
+ CounterResult counter = scanReport.scanMetrics().skippedDataManifests();
+ long value = counter != null ? counter.value() : 0L;
+ return new TaskSkippedDataManifests(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
new file mode 100644
index 0000000000..c300d835e7
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskTotalFileSize implements CustomTaskMetric {
+
+ private final long value;
+
+ private TaskTotalFileSize(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return TotalFileSize.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskTotalFileSize from(ScanReport scanReport) {
+ CounterResult counter = scanReport.scanMetrics().totalFileSizeInBytes();
+ long value = counter != null ? counter.value() : 0L;
+ return new TaskTotalFileSize(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
new file mode 100644
index 0000000000..32ac6fde8b
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.TimerResult;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+
+public class TaskTotalPlanningDuration implements CustomTaskMetric {
+
+ private final long value;
+
+ private TaskTotalPlanningDuration(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public String name() {
+ return TotalPlanningDuration.NAME;
+ }
+
+ @Override
+ public long value() {
+ return value;
+ }
+
+ public static TaskTotalPlanningDuration from(ScanReport scanReport) {
+ TimerResult timerResult = scanReport.scanMetrics().totalPlanningDuration();
+ long value = timerResult != null ? timerResult.totalDuration().toMillis()
: -1;
+ return new TaskTotalPlanningDuration(value);
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
new file mode 100644
index 0000000000..994626e54f
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class TotalFileSize extends CustomSumMetric {
+
+ static final String NAME = "totalFileSize";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "total file size (bytes)";
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
new file mode 100644
index 0000000000..8b66eeac40
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source.metrics;
+
+import org.apache.spark.sql.connector.metric.CustomSumMetric;
+
+public class TotalPlanningDuration extends CustomSumMetric {
+
+ static final String NAME = "totalPlanningDuration";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "total planning duration (ms)";
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
new file mode 100644
index 0000000000..7b943372d1
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static scala.collection.JavaConverters.seqAsJavaListConverter;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.metric.SQLMetric;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Test;
+import scala.collection.JavaConverters;
+
+public class TestSparkReadMetrics extends SparkTestBaseWithCatalog {
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testReadMetricsForV1Table() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES
('format-version'='2')",
+ tableName);
+
+ spark.range(10000).coalesce(1).writeTo(tableName).append();
+ spark.range(10001, 20000).coalesce(1).writeTo(tableName).append();
+
+ Dataset<Row> df = spark.sql(String.format("select * from %s where id <
10000", tableName));
+ df.collect();
+
+ List<SparkPlan> sparkPlans =
+
seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava();
+ Map<String, SQLMetric> metricsMap =
+
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
+
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
+
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
+
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
+
Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0);
+
Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0);
+
Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0);
+ }
+
+ @Test
+ public void testReadMetricsForV2Table() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES
('format-version'='2')",
+ tableName);
+
+ spark.range(10000).coalesce(1).writeTo(tableName).append();
+ spark.range(10001, 20000).coalesce(1).writeTo(tableName).append();
+
+ Dataset<Row> df = spark.sql(String.format("select * from %s where id <
10000", tableName));
+ df.collect();
+
+ List<SparkPlan> sparkPlans =
+
seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava();
+ Map<String, SQLMetric> metricsMap =
+
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
+
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
+
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
+
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
+
Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0);
+
Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0);
+
Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0);
+ }
+}