This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c24706c7 IMPALA-13268: Integrate Iceberg ScanMetrics into Impala 
query profiles
3c24706c7 is described below

commit 3c24706c72818a1668159a428d4f2afcadea9f27
Author: Gabor Kaszab <[email protected]>
AuthorDate: Tue Jan 28 14:32:04 2025 +0100

    IMPALA-13268: Integrate Iceberg ScanMetrics into Impala query profiles
    
    When calling planFiles() on an Iceberg table, it can give us some
    metrics like total planning time, number of data/delete files and
    manifests, how many of these could be skipped etc.
    
    This change integrates these metrics into the query profile, under the
    "Frontend" section. These metrics are per-table, so if multiple tables
    are scanned for the query there will be multiple sections in the
    profile.
    
    Note that we only have these metrics for a table if Iceberg needs to be
    used for planning for that table, e.g. if a predicate is pushed down to
    Iceberg or if there is time travel. For tables where Iceberg was not
    used in planning, the profile will contain a short note describing this.
    
    To facilitate pairing the metrics with scans, the metrics header
    references the plan node responsible for the scan. This will always be
    the top level node for the scan, so it can be a SCAN node, a JOIN node
    or a UNION node depending on whether the table has delete files.
    
    Testing:
     - added EE tests in iceberg-scan-metrics.tests
     - added a test in PlannerTest.java that asserts on the number of
       metrics; if it changes in a new Iceberg release, the test will fail
       and we can update our reporting
    
    Change-Id: I080ee8eafc459dad4d21356ac9042b72d0570219
    Reviewed-on: http://gerrit.cloudera.org:8080/22501
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Daniel Becker <[email protected]>
---
 .../analysis/AlterTableDropPartitionStmt.java      |   2 +-
 .../apache/impala/planner/IcebergScanPlanner.java  |  51 +++++++-
 .../java/org/apache/impala/service/Frontend.java   |  96 +++++++++++++++
 .../org/apache/impala/service/FrontendProfile.java |  25 ++++
 .../java/org/apache/impala/util/IcebergUtil.java   |  17 ++-
 .../org/apache/impala/planner/PlannerTest.java     |  24 ++++
 .../queries/QueryTest/iceberg-scan-metrics.test    | 132 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  28 +++++
 8 files changed, 368 insertions(+), 7 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
index a3b40a998..3f17b7045 100644
--- 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableDropPartitionStmt.java
@@ -166,7 +166,7 @@ public class AlterTableDropPartitionStmt extends 
AlterTableStmt {
     }
 
     try (CloseableIterable<FileScanTask> fileScanTasks = 
IcebergUtil.planFiles(table,
-        icebergPartitionExprs, null)) {
+        icebergPartitionExprs, null, null)) {
       icebergFilePaths_ = new ArrayList<>();
       Set<String> icebergPartitionSummary = new HashSet<>();
       for (FileScanTask fileScanTask : fileScanTasks) {
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 83cb0b555..eec0d3331 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -43,6 +43,10 @@ import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.True;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BinaryPredicate.Operator;
@@ -78,6 +82,7 @@ import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.fb.FbIcebergMetadata;
 import org.apache.impala.planner.JoinNode.DistributionMode;
+import org.apache.impala.service.Frontend;
 import org.apache.impala.thrift.TColumnStats;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TQueryOptions;
@@ -93,6 +98,26 @@ import org.slf4j.LoggerFactory;
  * class deals with such complexities.
  */
 public class IcebergScanPlanner {
+  // TODO: This class is available in the Iceberg library from release 1.4.0. 
We could
+  // drop this and use the one from Iceberg once we've done a version bump.
+  private static class InMemoryMetricsReporter implements MetricsReporter {
+    private MetricsReport metricsReport_;
+
+    @Override
+    public void report(MetricsReport report) {
+      Preconditions.checkArgument(
+          report == null || report instanceof ScanReport,
+          "Metrics report is not a scan report");
+      this.metricsReport_ = report;
+    }
+
+    public ScanMetricsResult scanMetricsResult() {
+      if (metricsReport_ == null) return null;
+
+      return ((ScanReport) metricsReport_).scanMetrics();
+    }
+  }
+
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergScanPlanner.class);
 
   private Analyzer analyzer_;
@@ -137,6 +162,8 @@ public class IcebergScanPlanner {
 
   private final long snapshotId_;
 
+  private final InMemoryMetricsReporter metricsReporter_ = new 
InMemoryMetricsReporter();
+
   public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx,
       TableRef iceTblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo)
       throws ImpalaException {
@@ -152,6 +179,20 @@ public class IcebergScanPlanner {
   }
 
   public PlanNode createIcebergScanPlan() throws ImpalaException {
+    PlanNode res = createPlanNodeHelper();
+    Preconditions.checkNotNull(res);
+
+    ScanMetricsResult metricsResult = metricsReporter_.scanMetricsResult();
+    Preconditions.checkState(
+        !needIcebergForPlanning() || snapshotId_ == -1 || metricsResult != 
null);
+
+    // Update the query profile with the scan metrics.
+    Frontend.addIcebergScanMetricsToProfile(res.getId().toString(), 
metricsResult);
+
+    return res;
+  }
+
+  private PlanNode createPlanNodeHelper() throws ImpalaException {
     if (!needIcebergForPlanning()) {
       analyzer_.materializeSlots(conjuncts_);
       setFileDescriptorsBasedOnFileStore();
@@ -560,9 +601,14 @@ public class IcebergScanPlanner {
 
     TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
 
+    // 'metricsReporter_' is filled when the try-with-resources releases the 
FileScanTask
+    // iterable, i.e. not when the call to IcebergUtil.planFiles() returns.
     try (CloseableIterable<FileScanTask> fileScanTasks =
-        IcebergUtil.planFiles(getIceTable(),
-            new ArrayList<>(impalaIcebergPredicateMapping_.keySet()), 
timeTravelSpec)) {
+        IcebergUtil.planFiles(
+            getIceTable(),
+            new ArrayList<>(impalaIcebergPredicateMapping_.keySet()),
+            timeTravelSpec,
+            metricsReporter_)) {
       long dataFilesCacheMisses = 0;
       for (FileScanTask fileScanTask : fileScanTasks) {
         Expression residualExpr = fileScanTask.residual();
@@ -598,6 +644,7 @@ public class IcebergScanPlanner {
           "Failed to load data files for Iceberg table: %s", 
getIceTable().getFullName()),
           e);
     }
+
     updateDeleteStatistics();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index cf0223c9a..b3237b741 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -70,6 +70,8 @@ import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
 import org.apache.impala.analysis.AlterDbStmt;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -2682,6 +2684,9 @@ public class Frontend {
       planCtx.compilationState_.restoreState();
       FrontendProfile.getCurrent().addToCounter(
           EXECUTOR_GROUPS_CONSIDERED, TUnit.UNIT, 1);
+      // Clear profile children that will not be used.
+      FrontendProfile.getCurrent().clearStagedChildrenProfiles();
+
       i++;
     }
 
@@ -2701,6 +2706,8 @@ public class Frontend {
 
     LOG.info("Selected executor group: " + group_set + ", reason: " + reason);
 
+    FrontendProfile.getCurrent().finalizeStagedChildrenProfiles();
+
     // Transfer the profile access flag which is collected during 1st 
compilation.
     
req.setUser_has_profile_access(planCtx.compilationState_.userHasProfileAccess());
 
@@ -2780,6 +2787,95 @@ public class Frontend {
     FrontendProfile.getCurrent().addChildrenProfile(profile);
   }
 
+  /**
+   * Update the query profile with Iceberg Scan metrics. The metrics are 
placed into a
+   * section within the 'Frontend' section of the profile. 'nodeId' is the id 
of the node
+   * that is responsible for scanning the corresponding Iceberg table: this 
node may be a
+   * scan node, a join node or a union node depending on whether the table has 
delete
+   * files.
+   * If 'metrics' is NULL, the query profile will be updated with a message 
that planning
+   * was done without using Iceberg.
+   */
+  public static void addIcebergScanMetricsToProfile(String nodeId,
+      ScanMetricsResult metrics) {
+    TRuntimeProfileNode profile = createTRuntimeProfileNode(
+        "Iceberg Plan Metrics for Node " + nodeId);
+
+    if (metrics == null) {
+      addInfoString(profile, "Planning done without Iceberg",
+          "no Iceberg scan metrics available.");
+    } else {
+      fillProfileNodeWithIcebergScanMetrics(profile, metrics);
+    }
+
+    FrontendProfile.getCurrent().stageChildrenProfile(profile);
+  }
+
+  private static void 
fillProfileNodeWithIcebergScanMetrics(TRuntimeProfileNode profile,
+      ScanMetricsResult metrics) {
+    long durationMs = 
metrics.totalPlanningDuration().totalDuration().toMillis();
+    String totalDuration = PrintUtils.printTimeMs(durationMs);
+    if (totalDuration.isEmpty()) {
+      // PrintUtils.printTimeMs() returns an empty string for '0'.
+      totalDuration = "0";
+    }
+    addInfoString(profile, ScanMetrics.TOTAL_PLANNING_DURATION, totalDuration);
+
+    String resultDataFiles = Long.toString(metrics.resultDataFiles().value());
+    addInfoString(profile, ScanMetrics.RESULT_DATA_FILES, resultDataFiles);
+
+    String resultDeleteFiles = 
Long.toString(metrics.resultDeleteFiles().value());
+    addInfoString(profile, ScanMetrics.RESULT_DELETE_FILES, resultDeleteFiles);
+
+    String totalDataManifests = 
Long.toString(metrics.totalDataManifests().value());
+    addInfoString(profile, ScanMetrics.TOTAL_DATA_MANIFESTS, 
totalDataManifests);
+
+    String totalDeleteManifests = 
Long.toString(metrics.totalDeleteManifests().value());
+    addInfoString(profile, ScanMetrics.TOTAL_DELETE_MANIFESTS, 
totalDeleteManifests);
+
+    String scannedDataManifests = 
Long.toString(metrics.scannedDataManifests().value());
+    addInfoString(profile, ScanMetrics.SCANNED_DATA_MANIFESTS, 
scannedDataManifests);
+
+    String skippedDataManifests = 
Long.toString(metrics.skippedDataManifests().value());
+    addInfoString(profile, ScanMetrics.SKIPPED_DATA_MANIFESTS, 
skippedDataManifests);
+
+    long totalFileSizeInBytes = metrics.totalFileSizeInBytes().value();
+    String totalFileSizePretty = PrintUtils.printBytes(totalFileSizeInBytes);
+    String totalFileSize = String.format("%s (%s)",
+        totalFileSizePretty, totalFileSizeInBytes);
+    addInfoString(profile, ScanMetrics.TOTAL_FILE_SIZE_IN_BYTES, 
totalFileSize);
+
+    long totalDeleteFileSizeInBytes = 
metrics.totalDeleteFileSizeInBytes().value();
+    String totalDeleteFileSizePretty = 
PrintUtils.printBytes(totalDeleteFileSizeInBytes);
+    String totalDeleteFileSize = String.format("%s (%s)",
+        totalDeleteFileSizePretty, totalDeleteFileSizeInBytes);
+    addInfoString(profile, ScanMetrics.TOTAL_DELETE_FILE_SIZE_IN_BYTES,
+        totalDeleteFileSize);
+
+    String skippedDataFiles = 
Long.toString(metrics.skippedDataFiles().value());
+    addInfoString(profile, ScanMetrics.SKIPPED_DATA_FILES, skippedDataFiles);
+
+    String skippedDeleteFiles = 
Long.toString(metrics.skippedDeleteFiles().value());
+    addInfoString(profile, ScanMetrics.SKIPPED_DELETE_FILES, 
skippedDeleteFiles);
+
+    String scannedDeleteManifests = Long.toString(
+        metrics.scannedDeleteManifests().value());
+    addInfoString(profile, ScanMetrics.SCANNED_DELETE_MANIFESTS, 
scannedDeleteManifests);
+
+    String skippedDeleteManifests = Long.toString(
+        metrics.skippedDeleteManifests().value());
+    addInfoString(profile, ScanMetrics.SKIPPED_DELETE_MANIFESTS, 
skippedDeleteManifests);
+
+    String indexedDeleteFiles = 
Long.toString(metrics.indexedDeleteFiles().value());
+    addInfoString(profile, ScanMetrics.INDEXED_DELETE_FILES, 
indexedDeleteFiles);
+
+    String equalityDeleteFiles = 
Long.toString(metrics.equalityDeleteFiles().value());
+    addInfoString(profile, ScanMetrics.EQUALITY_DELETE_FILES, 
equalityDeleteFiles);
+
+    String positionalDeleteFiles = 
Long.toString(metrics.positionalDeleteFiles().value());
+    addInfoString(profile, ScanMetrics.POSITIONAL_DELETE_FILES, 
positionalDeleteFiles);
+  }
+
   private TExecRequest doCreateExecRequest(CompilerFactory compilerFactory,
       PlanCtx planCtx, List<String> warnings, EventSequence timeline)
       throws ImpalaException {
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java 
b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
index 9deb138e8..6eec58394 100644
--- a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -67,6 +67,17 @@ public class FrontendProfile {
   @GuardedBy("this")
   private Map<String, TRuntimeProfileNode> childrenProfiles_ = new TreeMap<>();
 
+  /**
+   * Profile nodes to be added to the profile when it is decided that there is 
no
+   * replanning. With auto-scaling, it is possible that different executor 
groups are
+   * tried for the query, and a plan is generated for each. Profile 
information that is
+   * only needed for the last, final plan can be collected here. This list is 
cleared once
+   * a plan for an executor group is discarded. The profiles are added as 
regular children
+   * for the final plan.
+   */
+  @GuardedBy("this")
+  private List<TRuntimeProfileNode> stagedChildrenProfiles_ = new 
ArrayList<>();
+
   FrontendProfile() {
     profile_ = new TRuntimeProfileNode("Frontend",
         /*num_children=*/ 0,
@@ -191,6 +202,20 @@ public class FrontendProfile {
     childrenProfiles_.put(child.getName(), child);
   }
 
+  public synchronized void stageChildrenProfile(TRuntimeProfileNode child) {
+    stagedChildrenProfiles_.add(child);
+  }
+
+  public synchronized void clearStagedChildrenProfiles() {
+    stagedChildrenProfiles_.clear();
+  }
+
+  public synchronized void finalizeStagedChildrenProfiles() {
+    for (TRuntimeProfileNode profile : stagedChildrenProfiles_) {
+      addChildrenProfile(profile);
+    }
+  }
+
   private TCounter getOrCreateCounter(String name, TUnit unit) {
     TCounter counter = countersByName_.get(Preconditions.checkNotNull(name));
     if (counter == null) {
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 6f3f0fc5e..c8f23cde2 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -64,6 +64,9 @@ import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.expressions.Term;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporters;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
 import org.apache.iceberg.transforms.Transforms;
@@ -631,9 +634,11 @@ public class IcebergUtil {
    * Returns a list of Iceberg FileScanTask objects. These objects contain a 
data file
    * to be scanned and the associated delete files need to be applied.
    */
-  public static CloseableIterable<FileScanTask> planFiles(FeIcebergTable table,
-      List<Expression> predicates, TimeTravelSpec timeTravelSpec)
-        throws TableLoadingException {
+  public static CloseableIterable<FileScanTask> planFiles(
+      FeIcebergTable table,
+      List<Expression> predicates,
+      TimeTravelSpec timeTravelSpec,
+      MetricsReporter metricsReporter) throws TableLoadingException {
     if (table.snapshotId() == -1) return CloseableIterable.empty();
 
     TableScan scan = createScanAsOf(table, timeTravelSpec);
@@ -641,6 +646,10 @@ public class IcebergUtil {
       scan = scan.filter(predicate);
     }
 
+    if (metricsReporter != null) {
+      scan = scan.metricsReporter(metricsReporter);
+    }
+
     return scan.planFiles();
   }
 
@@ -651,7 +660,7 @@ public class IcebergUtil {
       FeIcebergTable table, List<Expression> predicates, TimeTravelSpec 
timeTravelSpec)
         throws TableLoadingException {
     try (CloseableIterable<FileScanTask> fileScanTasks = planFiles(
-        table, predicates, timeTravelSpec)) {
+        table, predicates, timeTravelSpec, null)) {
       return new GroupedContentFiles(fileScanTasks);
     } catch (IOException e) {
       throw new TableLoadingException("Error during reading Iceberg manifest 
files.", e);
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 38fa9536c..2193e8346 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -19,12 +19,17 @@ package org.apache.impala.planner;
 
 import static org.junit.Assert.assertEquals;
 
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.iceberg.metrics.ScanMetricsResult;
+
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.Db;
@@ -1642,4 +1647,23 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
             PlannerTestOption.INCLUDE_EXPLAIN_HEADER));
   }
+
+  /**
+   * This test ensures that if the number of metrics in Iceberg's 
ScanMetricsResult
+   * changes we're aware of it.
+   */
+  @Test
+  public void testIcebergScanMetricsResultCardinality() {
+    final int expectedMethodNumber = 16;
+
+    Method[] methods = ScanMetricsResult.class.getDeclaredMethods();
+
+    long publicMethods = java.util.Arrays.stream(methods)
+      .filter(method -> {
+        int modifiers = method.getModifiers();
+        return Modifier.isPublic(modifiers) && !Modifier.isStatic(modifiers);
+    }).count();
+
+    Assert.assertEquals(expectedMethodNumber, publicMethods);
+  }
 }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
new file mode 100644
index 000000000..472e51777
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-scan-metrics.test
@@ -0,0 +1,132 @@
+====
+---- QUERY
+select * from functional_parquet.iceberg_partitioned
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+        Planning done without Iceberg: no Iceberg scan metrics available.
+====
+---- QUERY
+# Filtering on a partition column pushes the predicate down to Iceberg, so we 
have metrics.
+select * from functional_parquet.iceberg_partitioned where action='download'
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+row_regex:.*total-planning-duration: .+
+result-data-files: 6
+result-delete-files: 0
+total-data-manifests: 1
+total-delete-manifests: 0
+scanned-data-manifests: 1
+skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+skipped-data-files: 14
+skipped-delete-files: 0
+scanned-delete-manifests: 0
+skipped-delete-manifests: 0
+indexed-delete-files: 0
+equality-delete-files: 0
+positional-delete-files: 0
+====
+---- QUERY
+# Time travel results in planning with Iceberg, so we have metrics.
+select * from
+  functional_parquet.iceberg_v2_no_deletes FOR SYSTEM_VERSION AS OF 
NO_DELETES_SNAPTHOT_ID,
+  functional_parquet.iceberg_v2_positional_delete_all_rows FOR SYSTEM_VERSION 
AS OF POS_DELETE_ALL_ROWS_SNAPSHOT_ID,
+  
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files 
FOR SYSTEM_VERSION AS OF NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+row_regex:.*total-planning-duration: .+
+  result-data-files: 1
+  result-delete-files: 0
+  total-data-manifests: 1
+  total-delete-manifests: 0
+  scanned-data-manifests: 1
+  skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+  skipped-data-files: 0
+  skipped-delete-files: 0
+  scanned-delete-manifests: 0
+  skipped-delete-manifests: 0
+  indexed-delete-files: 0
+  equality-delete-files: 0
+  positional-delete-files: 0
+Iceberg Plan Metrics for Node 03:
+row_regex:.*total-planning-duration: .+
+  result-data-files: 1
+  result-delete-files: 1
+  total-data-manifests: 1
+  total-delete-manifests: 1
+  scanned-data-manifests: 1
+  skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+  skipped-data-files: 0
+  skipped-delete-files: 0
+  scanned-delete-manifests: 1
+  skipped-delete-manifests: 0
+  indexed-delete-files: 1
+  equality-delete-files: 0
+  positional-delete-files: 1
+Iceberg Plan Metrics for Node 08:
+row_regex:.*total-planning-duration: .+
+  result-data-files: 4
+  result-delete-files: 2
+  total-data-manifests: 4
+  total-delete-manifests: 2
+  scanned-data-manifests: 4
+  skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+  skipped-data-files: 0
+  skipped-delete-files: 0
+  scanned-delete-manifests: 2
+  skipped-delete-manifests: 0
+  indexed-delete-files: 2
+  equality-delete-files: 0
+  positional-delete-files: 2
+====
+---- QUERY
+# No time travel for one of the tables.
+select * from
+  functional_parquet.iceberg_v2_no_deletes,
+  functional_parquet.iceberg_v2_positional_delete_all_rows FOR SYSTEM_VERSION 
AS OF POS_DELETE_ALL_ROWS_SNAPSHOT_ID,
+  
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files 
FOR SYSTEM_VERSION AS OF NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID
+---- RUNTIME_PROFILE
+Iceberg Plan Metrics for Node 00:
+        Planning done without Iceberg: no Iceberg scan metrics available.
+Iceberg Plan Metrics for Node 03:
+row_regex:.*total-planning-duration: .+
+  result-data-files: 1
+  result-delete-files: 1
+  total-data-manifests: 1
+  total-delete-manifests: 1
+  scanned-data-manifests: 1
+  skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+  skipped-data-files: 0
+  skipped-delete-files: 0
+  scanned-delete-manifests: 1
+  skipped-delete-manifests: 0
+  indexed-delete-files: 1
+  equality-delete-files: 0
+  positional-delete-files: 1
+Iceberg Plan Metrics for Node 08:
+row_regex:.*total-planning-duration: .+
+  result-data-files: 4
+  result-delete-files: 2
+  total-data-manifests: 4
+  total-delete-manifests: 2
+  scanned-data-manifests: 4
+  skipped-data-manifests: 0
+row_regex:.*total-file-size-in-bytes: .+ \(\d+\)
+row_regex:.*total-delete-file-size-in-bytes: .+ \(\d+\)
+  skipped-data-files: 0
+  skipped-delete-files: 0
+  scanned-delete-manifests: 2
+  skipped-delete-manifests: 0
+  indexed-delete-files: 2
+  equality-delete-files: 0
+  positional-delete-files: 2
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index abc92996b..bed02055d 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1493,6 +1493,34 @@ class TestIcebergTable(IcebergTestSuite):
     assert snapshots[0].get_parent_id() == snapshots[2].get_parent_id()
     assert snapshots[0].get_creation_time() < snapshots[2].get_creation_time()
 
+  def test_scan_metrics_in_profile(self, vector):
+    def get_latest_snapshot_id(fq_tbl_name):
+        query = ("select snapshot_id from {}.snapshots order by committed_at 
desc"
+            .format(fq_tbl_name))
+        res = self.execute_query(query)
+        return res.data[0]
+
+    ice_db = "functional_parquet"
+
+    no_deletes = "{}.{}".format(ice_db, "iceberg_v2_no_deletes")
+    no_deletes_snapshot_id = get_latest_snapshot_id(no_deletes)
+
+    pos_delete_all_rows = "{}.{}".format(ice_db, 
"iceberg_v2_positional_delete_all_rows")
+    pos_delete_all_rows_snapshot_id = 
get_latest_snapshot_id(pos_delete_all_rows)
+
+    not_all_data_files_have_delete_files = "{}.{}".format(
+        ice_db, "iceberg_v2_positional_not_all_data_files_have_delete_files")
+    not_all_data_files_have_delete_files_snapshot_id = get_latest_snapshot_id(
+        not_all_data_files_have_delete_files)
+
+    self.run_test_case('QueryTest/iceberg-scan-metrics', vector,
+        test_file_vars={
+            "NO_DELETES_SNAPTHOT_ID": no_deletes_snapshot_id,
+            "POS_DELETE_ALL_ROWS_SNAPSHOT_ID": pos_delete_all_rows_snapshot_id,
+            "NOT_ALL_DATA_FILES_HAVE_DELETE_FILES_SNAPSHOT_ID":
+                 not_all_data_files_have_delete_files_snapshot_id
+                       })
+
 
 class TestIcebergV2Table(IcebergTestSuite):
   """Tests related to Iceberg V2 tables."""

Reply via email to