This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 3c24706c7 IMPALA-13268: Integrate Iceberg ScanMetrics into Impala
query profiles
3c24706c7 is described below
commit 3c24706c72818a1668159a428d4f2afcadea9f27
Author: Gabor Kaszab <[email protected]>
AuthorDate: Tue Jan 28 14:32:04 2025 +0100
IMPALA-13268: Integrate Iceberg ScanMetrics into Impala query profiles
When calling planFiles() on an Iceberg table, it can give us some
metrics like total planning time, number of data/delete files and
manifests, how many of these could be skipped etc.
This change integrates these metrics into the query profile, under the
"Frontend" section. These metrics are per-table, so if multiple tables
are scanned for the query there will be multiple sections in the
profile.
Note that we only have these metrics for a table if Iceberg needs to be
used for planning for that table, e.g. if a predicate is pushed down to
Iceberg or if there is time travel. For tables where Iceberg was not
used in planning, the profile will contain a short note describing this.
To facilitate pairing the metrics with scans, the metrics header
references the plan node responsible for the scan. This will always be
the top level node for the scan, so it can be a SCAN node, a JOIN node
or a UNION node depending on whether the table has delete files.
Testing:
- added EE tests in iceberg-scan-metrics.tests
- added a test in PlannerTest.java that asserts on the number of
metrics; if it changes in a new Iceberg release, the test will fail
and we can update our reporting
Change-Id: I080ee8eafc459dad4d21356ac9042b72d0570219
Reviewed-on: http://gerrit.cloudera.org:8080/22501
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Daniel Becker <[email protected]>
---
.../analysis/AlterTableDropPartitionStmt.java | 2 +-
.../apache/impala/planner/IcebergScanPlanner.java | 51 +++++++-
.../java/org/apache/impala/service/Frontend.java | 96 +++++++++++++++
.../org/apache/impala/service/FrontendProfile.java | 25 ++++
.../java/org/apache/impala/util/IcebergUtil.java | 17 ++-
.../org/apache/impala/planner/PlannerTest.java | 24 ++++
.../queries/QueryTest/iceberg-scan-metrics.test | 132 +++++++++++++++++++++
tests/query_test/test_iceberg.py | 28 +++++
8 files changed, 368 insertions(+), 7 deletions(-)
diff --git
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
index a3b40a998..3f17b7045 100644
---
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
+++
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
@@ -166,7 +166,7 @@ public class AlterTableDropPartitionStmt extends
AlterTableStmt {
}
try (CloseableIterable<FileScanTask> fileScanTasks =
IcebergUtil.planFiles(table,
- icebergPartitionExprs, null)) {
+ icebergPartitionExprs, null, null)) {
icebergFilePaths_ = new ArrayList<>();
Set<String> icebergPartitionSummary = new HashSet<>();
for (FileScanTask fileScanTask : fileScanTasks) {
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 83cb0b555..eec0d3331 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -43,6 +43,10 @@ import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.True;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BinaryPredicate.Operator;
@@ -78,6 +82,7 @@ import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.fb.FbIcebergMetadata;
import org.apache.impala.planner.JoinNode.DistributionMode;
+import org.apache.impala.service.Frontend;
import org.apache.impala.thrift.TColumnStats;
import org.apache.impala.thrift.TIcebergPartitionTransformType;
import org.apache.impala.thrift.TQueryOptions;
@@ -93,6 +98,26 @@ import org.slf4j.LoggerFactory;
* class deals with such complexities.
*/
public class IcebergScanPlanner {
+ // TODO: This class is available in the Iceberg library from release 1.4.0.
We could
+ // drop this and use the one from Iceberg once we've done a version bump.
+ private static class InMemoryMetricsReporter implements MetricsReporter {
+ private MetricsReport metricsReport_;
+
+ @Override
+ public void report(MetricsReport report) {
+ Preconditions.checkArgument(
+ report == null || report instanceof ScanReport,
+ "Metrics report is not a scan report");
+ this.metricsReport_ = report;
+ }
+
+ public ScanMetricsResult scanMetricsResult() {
+ if (metricsReport_ == null) return null;
+
+ return ((ScanReport) metricsReport_).scanMetrics();
+ }
+ }
+
private static final Logger LOG =
LoggerFactory.getLogger(IcebergScanPlanner.class);
private Analyzer analyzer_;
@@ -137,6 +162,8 @@ public class IcebergScanPlanner {
private final long snapshotId_;
+ private final InMemoryMetricsReporter metricsReporter_ = new
InMemoryMetricsReporter();
+
public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx,
TableRef iceTblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo)
throws ImpalaException {
@@ -152,6 +179,20 @@ public class IcebergScanPlanner {
}
public PlanNode createIcebergScanPlan() throws ImpalaException {
+ PlanNode res = createPlanNodeHelper();
+ Preconditions.checkNotNull(res);
+
+ ScanMetricsResult metricsResult = metricsReporter_.scanMetricsResult();
+ Preconditions.checkState(
+ !needIcebergForPlanning() || snapshotId_ == -1 || metricsResult !=
null);
+
+ // Update the query profile with the scan metrics.
+ Frontend.addIcebergScanMetricsToProfile(res.getId().toString(),
metricsResult);
+
+ return res;
+ }
+
+ private PlanNode createPlanNodeHelper() throws ImpalaException {
if (!needIcebergForPlanning()) {
analyzer_.materializeSlots(conjuncts_);
setFileDescriptorsBasedOnFileStore();
@@ -560,9 +601,14 @@ public class IcebergScanPlanner {
TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
+ // 'metricsReporter_' is filled when the try-with-resources releases the
FileScanTask
+ // iterable, i.e. not when the call to IcebergUtil.planFiles() returns.
try (CloseableIterable<FileScanTask> fileScanTasks =
- IcebergUtil.planFiles(getIceTable(),
- new ArrayList<>(impalaIcebergPredicateMapping_.keySet()),
timeTravelSpec)) {
+ IcebergUtil.planFiles(
+ getIceTable(),
+ new ArrayList<>(impalaIcebergPredicateMapping_.keySet()),
+ timeTravelSpec,
+ metricsReporter_)) {
long dataFilesCacheMisses = 0;
for (FileScanTask fileScanTask : fileScanTasks) {
Expression residualExpr = fileScanTask.residual();
@@ -598,6 +644,7 @@ public class IcebergScanPlanner {
"Failed to load data files for Iceberg table: %s",
getIceTable().getFullName()),
e);
}
+
updateDeleteStatistics();
}
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index cf0223c9a..b3237b741 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -70,6 +70,8 @@ import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.impala.analysis.AlterDbStmt;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -2682,6 +2684,9 @@ public class Frontend {
planCtx.compilationState_.restoreState();
FrontendProfile.getCurrent().addToCounter(
EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
+ // Clear profile children that will not be used.
+ FrontendProfile.getCurrent().clearStagedChildrenProfiles();
+
i++;
}
@@ -2701,6 +2706,8 @@ public class Frontend {
LOG.info("Selected executor group: " + group_set + ", reason: " + reason);
+ FrontendProfile.getCurrent().finalizeStagedChildrenProfiles();
+
// Transfer the profile access flag which is collected during 1st
compilation.
req.setUser_has_profile_access(planCtx.compilationState_.userHasProfileAccess());
@@ -2780,6 +2787,95 @@ public class Frontend {
FrontendProfile.getCurrent().addChildrenProfile(profile);
}
+ /**
+ * Update the query profile with Iceberg Scan metrics. The metrics are
placed into a
+ * section within the 'Frontend' section of the profile. 'nodeId' is the id
of the node
+ * that is responsible for scanning the corresponding Iceberg table: this
node may be a
+ * scan node, a join node or a union node depending on whether the table has
delete
+ * files.
+ * If 'metrics' is NULL, the query profile will be updated with a message
that planning
+ * was done without using Iceberg.
+ */
+ public static void addIcebergScanMetricsToProfile(String nodeId,
+ ScanMetricsResult metrics) {
+ TRuntimeProfileNode profile = createTRuntimeProfileNode(
+ "Iceberg Plan Metrics for Node " + nodeId);
+
+ if (metrics == null) {
+ addInfoString(profile, "Planning done without Iceberg",
+ "no Iceberg scan metrics available.");
+ } else {
+ fillProfileNodeWithIcebergScanMetrics(profile, metrics);
+ }
+
+ FrontendProfile.getCurrent().stageChildrenProfile(profile);
+ }
+
+ private static void
fillProfileNodeWithIcebergScanMetrics(TRuntimeProfileNode profile,
+ ScanMetricsResult metrics) {
+ long durationMs =
metrics.totalPlanningDuration().totalDuration().toMillis();
+ String totalDuration = PrintUtils.printTimeMs(durationMs);
+ if (totalDuration.isEmpty()) {
+ // PrintUtils.printTimeMs() returns an empty string for '0'.
+ totalDuration = "0";
+ }
+ addInfoString(profile, ScanMetrics.TOTAL_PLANNING_DURATION, totalDuration);
+
+ String resultDataFiles = Long.toString(metrics.resultDataFiles().value());
+ addInfoString(profile, ScanMetrics.RESULT_DATA_FILES, resultDataFiles);
+
+ String resultDeleteFiles =
Long.toString(metrics.resultDeleteFiles().value());
+ addInfoString(profile, ScanMetrics.RESULT_DELETE_FILES, resultDeleteFiles);
+
+ String totalDataManifests =
Long.toString(metrics.totalDataManifests().value());
+ addInfoString(profile, ScanMetrics.TOTAL_DATA_MANIFESTS,
totalDataManifests);
+
+ String totalDeleteManifests =
Long.toString(metrics.totalDeleteManifests().value());
+ addInfoString(profile, ScanMetrics.TOTAL_DELETE_MANIFESTS,
totalDeleteManifests);
+
+ String scannedDataManifests =
Long.toString(metrics.scannedDataManifests().value());
+ addInfoString(profile, ScanMetrics.SCANNED_DATA_MANIFESTS,
scannedDataManifests);
+
+ String skippedDataManifests =
Long.toString(metrics.skippedDataManifests().value());
+ addInfoString(profile, ScanMetrics.SKIPPED_DATA_MANIFESTS,
skippedDataManifests);
+
+ long totalFileSizeInBytes = metrics.totalFileSizeInBytes().value();
+ String totalFileSizePretty = PrintUtils.printBytes(totalFileSizeInBytes);
+ String totalFileSize = String.format("%s (%s)",
+ totalFileSizePretty, totalFileSizeInBytes);
+ addInfoString(profile, ScanMetrics.TOTAL_FILE_SIZE_IN_BYTES,
totalFileSize);
+
+ long totalDeleteFileSizeInBytes =
metrics.totalDeleteFileSizeInBytes().value();
+ String totalDeleteFileSizePretty =
PrintUtils.printBytes(totalDeleteFileSizeInBytes);
+ String totalDeleteFileSize = String.format("%s (%s)",
+ totalDeleteFileSizePretty, totalDeleteFileSizeInBytes);
+ addInfoString(profile, ScanMetrics.TOTAL_DELETE_FILE_SIZE_IN_BYTES,
+ totalDeleteFileSize);
+
+ String skippedDataFiles =
Long.toString(metrics.skippedDataFiles().value());
+ addInfoString(profile, ScanMetrics.SKIPPED_DATA_FILES, skippedDataFiles);
+
+ String skippedDeleteFiles =
Long.toString(metrics.skippedDeleteFiles().value());
+ addInfoString(profile, ScanMetrics.SKIPPED_DELETE_FILES,
skippedDeleteFiles);
+
+ String scannedDeleteManifests = Long.toString(
+ metrics.scannedDeleteManifests().value());
+ addInfoString(profile, ScanMetrics.SCANNED_DELETE_MANIFESTS,
scannedDeleteManifests);
+
+ String skippedDeleteManifests = Long.toString(
+ metrics.skippedDeleteManifests().value());
+ addInfoString(profile, ScanMetrics.SKIPPED_DELETE_MANIFESTS,
skippedDeleteManifests);
+
+ String indexedDeleteFiles =
Long.toString(metrics.indexedDeleteFiles().value());
+ addInfoString(profile, ScanMetrics.INDEXED_DELETE_FILES,
indexedDeleteFiles);
+
+ String equalityDeleteFiles =
Long.toString(metrics.equalityDeleteFiles().value());
+ addInfoString(profile, ScanMetrics.EQUALITY_DELETE_FILES,
equalityDeleteFiles);
+
+ String positionalDeleteFiles =
Long.toString(metrics.positionalDeleteFiles().value());
+ addInfoString(profile, ScanMetrics.POSITIONAL_DELETE_FILES,
positionalDeleteFiles);
+ }
+
private TExecRequest doCreateExecRequest(CompilerFactory compilerFactory,
PlanCtx planCtx, List<String> warnings, EventSequence timeline)
throws ImpalaException {
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
index 9deb138e8..6eec58394 100644
--- a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -67,6 +67,17 @@ public class FrontendProfile {
@GuardedBy("this")
private Map<String, TRuntimeProfileNode> childrenProfiles_ = new TreeMap<>();
+ /**
+ * Profile nodes to be added to the profile when it is decided that there is
no
+ * replanning. With auto-scaling, it is possible that different executor
groups are
+ * tried for the query, and a plan is generated for each. Profile
information that is
+ * only needed for the last, final plan can be collected here. This list is
cleared once
+ * a plan for an executor group is discarded. The profiles are added as
regular children
+ * for the final plan.
+ */
+ @GuardedBy("this")
+ private List<TRuntimeProfileNode> stagedChildrenProfiles_ = new
ArrayList<>();
+
FrontendProfile() {
profile_ = new TRuntimeProfileNode("Frontend",
/*num_children=*/ 0,
@@ -191,6 +202,20 @@ public class FrontendProfile {
childrenProfiles_.put(child.getName(), child);
}
+ public synchronized void stageChildrenProfile(TRuntimeProfileNode child) {
+ stagedChildrenProfiles_.add(child);
+ }
+
+ public synchronized void clearStagedChildrenProfiles() {
+ stagedChildrenProfiles_.clear();
+ }
+
+ public synchronized void finalizeStagedChildrenProfiles() {
+ for (TRuntimeProfileNode profile : stagedChildrenProfiles_) {
+ addChildrenProfile(profile);
+ }
+ }
+
private TCounter getOrCreateCounter(String name, TUnit unit) {
TCounter counter = countersByName_.get(Preconditions.checkNotNull(name));
if (counter == null) {
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 6f3f0fc5e..c8f23cde2 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -64,6 +64,9 @@ import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.transforms.Transforms;
@@ -631,9 +634,11 @@ public class IcebergUtil {
* Returns a list of Iceberg FileScanTask objects. These objects contain a
data file
* to be scanned and the associated delete files need to be applied.
*/
- public static CloseableIterable<FileScanTask> planFiles(FeIcebergTable table,
- List<Expression> predicates, TimeTravelSpec timeTravelSpec)
- throws TableLoadingException {
+ public static CloseableIterable<FileScanTask> planFiles(
+ FeIcebergTable table,
+ List<Expression> predicates,
+ TimeTravelSpec timeTravelSpec,
+ MetricsReporter metricsReporter) throws TableLoadingException {
if (table.snapshotId() == -1) return CloseableIterable.empty();
TableScan scan = createScanAsOf(table, timeTravelSpec);
@@ -641,6 +646,10 @@ public class IcebergUtil {
scan = scan.filter(predicate);
}
+ if (metricsReporter != null) {
+ scan = scan.metricsReporter(metricsReporter);
+ }
+
return scan.planFiles();
}
@@ -651,7 +660,7 @@ public class IcebergUtil {
FeIcebergTable table, List<Expression> predicates, TimeTravelSpec
timeTravelSpec)
throws TableLoadingException {
try (CloseableIterable<FileScanTask> fileScanTasks = planFiles(
- table, predicates, timeTravelSpec)) {
+ table, predicates, timeTravelSpec, null)) {
return new GroupedContentFiles(fileScanTasks);
} catch (IOException e) {
throw new TableLoadingException("Error during reading Iceberg manifest
files.", e);
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 38fa9536c..2193e8346 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -19,12 +19,17 @@ package org.apache.impala.planner;
import static org.junit.Assert.assertEquals;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.ColumnStats;
import org.apache.impala.catalog.Db;
@@ -1642,4 +1647,23 @@ public class PlannerTest extends PlannerTestBase {
ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
PlannerTestOption.INCLUDE_EXPLAIN_HEADER));
}
+
+ /**
+ * This test ensures that if the number of metrics in Iceberg's
ScanMetricsResult
+ * changes we're aware of it.
+ */
+ @Test
+ public void testIcebergScanMetricsResultCardinality() {
+ final int expectedMethodNumber = 16;
+
+ Method[] methods = ScanMetricsResult.class.getDeclaredMethods();
+
+ long publicMethods = java.util.Arrays.stream(methods)
+ .filter(method -> {
+ int modifiers = method.getModifiers();
+ return Modifier.isPublic(modifiers) && !Modifier.isStatic(modifiers);
+ }).count();
+
+ Assert.assertEquals(expectedMethodNumber, publicMethods);
+ }
}
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
new file mode 100644
index 000000000..472e51777
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
@@ -0,0 +1,132 @@
+====
+---- QUERY
+select * from functional_parquet.iceberg_partitioned
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+ Planning done without Iceberg: no Iceberg scan metrics available.
+====
+---- QUERY
+# Filtering on a partition column pushes the predicate down to Iceberg, so we
have metrics.
+select * from functional_parquet.iceberg_partitioned where action='download'
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+row_regex:.*total-planning-duration: .+
+result-data-files: 6
+result-delete-files: 0
+total-data-manifests: 1
+total-delete-manifests: 0
+scanned-data-manifests: 1
+skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+skipped-data-files: 14
+skipped-delete-files: 0
+scanned-delete-manifests: 0
+skipped-delete-manifests: 0
+indexed-delete-files: 0
+equality-delete-files: 0
+positional-delete-files: 0
+====
+---- QUERY
+# Time travel results in planning with Iceberg, so we have metrics.
+select * from
+ functional_parquet.iceberg_v2_no_deletes FOR SYSTEM_VERSION AS OF
NO_DELETES_SNAPTHOT_ID,
+ functional_parquet.iceberg_v2_positional_delete_all_rows FOR SYSTEM_VERSION
AS OF POS_DELETE_ALL_ROWS_SNAPSHOT_ID,
+
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files
FOR SYSTEM_VERSION AS OF NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+row_regex:.*total-planning-duration: .+
+ result-data-files: 1
+ result-delete-files: 0
+ total-data-manifests: 1
+ total-delete-manifests: 0
+ scanned-data-manifests: 1
+ skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+ skipped-data-files: 0
+ skipped-delete-files: 0
+ scanned-delete-manifests: 0
+ skipped-delete-manifests: 0
+ indexed-delete-files: 0
+ equality-delete-files: 0
+ positional-delete-files: 0
+Iceberg Plan Metrics for Node 03:
+row_regex:.*total-planning-duration: .+
+ result-data-files: 1
+ result-delete-files: 1
+ total-data-manifests: 1
+ total-delete-manifests: 1
+ scanned-data-manifests: 1
+ skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+ skipped-data-files: 0
+ skipped-delete-files: 0
+ scanned-delete-manifests: 1
+ skipped-delete-manifests: 0
+ indexed-delete-files: 1
+ equality-delete-files: 0
+ positional-delete-files: 1
+Iceberg Plan Metrics for Node 08:
+row_regex:.*total-planning-duration: .+
+ result-data-files: 4
+ result-delete-files: 2
+ total-data-manifests: 4
+ total-delete-manifests: 2
+ scanned-data-manifests: 4
+ skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+ skipped-data-files: 0
+ skipped-delete-files: 0
+ scanned-delete-manifests: 2
+ skipped-delete-manifests: 0
+ indexed-delete-files: 2
+ equality-delete-files: 0
+ positional-delete-files: 2
+====
+---- QUERY
+# No time travel for one of the tables.
+select * from
+ functional_parquet.iceberg_v2_no_deletes,
+ functional_parquet.iceberg_v2_positional_delete_all_rows FOR SYSTEM_VERSION
AS OF POS_DELETE_ALL_ROWS_SNAPSHOT_ID,
+
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files
FOR SYSTEM_VERSION AS OF NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+ Planning done without Iceberg: no Iceberg scan metrics available.
+Iceberg Plan Metrics for Node 03:
+row_regex:.*total-planning-duration: .+
+ result-data-files: 1
+ result-delete-files: 1
+ total-data-manifests: 1
+ total-delete-manifests: 1
+ scanned-data-manifests: 1
+ skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+ skipped-data-files: 0
+ skipped-delete-files: 0
+ scanned-delete-manifests: 1
+ skipped-delete-manifests: 0
+ indexed-delete-files: 1
+ equality-delete-files: 0
+ positional-delete-files: 1
+Iceberg Plan Metrics for Node 08:
+row_regex:.*total-planning-duration: .+
+ result-data-files: 4
+ result-delete-files: 2
+ total-data-manifests: 4
+ total-delete-manifests: 2
+ scanned-data-manifests: 4
+ skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+ skipped-data-files: 0
+ skipped-delete-files: 0
+ scanned-delete-manifests: 2
+ skipped-delete-manifests: 0
+ indexed-delete-files: 2
+ equality-delete-files: 0
+ positional-delete-files: 2
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index abc92996b..bed02055d 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1493,6 +1493,34 @@ class TestIcebergTable(IcebergTestSuite):
assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id()
assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time()
+ def test_scan_metrics_in_profile(self, vector):
+ def get_latest_snapshot_id(fq_tbl_name):
+ query = ("select snapshot_id from {}.snapshots order by committed_at
desc"
+ .format(fq_tbl_name))
+ res = self.execute_query(query)
+ return res.data[0]
+
+ ice_db = "functional_parquet"
+
+ no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes")
+ no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes)
+
+ pos_delete_all_rows = "{}.{}".format(ice_db,
"iceberg_v2_positional_delete_all_rows")
+ pos_delete_all_rows_snapshot_id =
get_latest_snapshot_id(pos_delete_all_rows)
+
+ not_all_data_files_have_delete_files = "{}.{}".format(
+ ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files")
+ not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id(
+ not_all_data_files_have_delete_files)
+
+ self.run_test_case('QueryTest/iceberg-scan-metrics', vector,
+ test_file_vars={
+ "NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id,
+ "POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id,
+ "NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID":
+ not_all_data_files_have_delete_files_snapshot_id
+ })
+
class TestIcebergV2Table(IcebergTestSuite):
"""Tests related to Iceberg V2 tables."""