This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b9a05c843db [enhance](iceberg) Doris Iceberg Scan Metrics Integration 
(#59010)
b9a05c843db is described below

commit b9a05c843dbd69362ade086fd743a4dd787e68be
Author: Socrates <[email protected]>
AuthorDate: Tue Dec 16 22:21:07 2025 +0800

    [enhance](iceberg) Doris Iceberg Scan Metrics Integration (#59010)
    
    ### What problem does this PR solve?
    
    ## Overview
    
    This change pipes Apache Iceberg scan metrics directly into Doris query
    profiles so operators can inspect per-scan statistics (files, bytes,
    manifests, filters, etc.) from the FE profile UI. The integration
    consists of three pieces:
    
    1. **Summary Profile Slot** – `SummaryProfile` now includes an `Iceberg
    Scan Metrics` entry in the execution summary list so the FE profile
    table reserves space for the Iceberg telemetry.
    2. **Metrics Reporter** – a new `IcebergMetricsReporter` implementation
    formats `ScanReport` data (planning time, file counters, size counters,
    delete-file stats, projected columns, metadata) and appends it to the
    summary entry whenever an Iceberg scan runs.
    3. **Scan Integration** – `IcebergScanNode` calls
    `table.newScan().metricsReporter(new IcebergMetricsReporter())`,
    ensuring every Iceberg table scan emits the metrics without requiring
    catalog-level configuration changes.
    
    All metrics remain scoped to Iceberg scans; other table formats are
    untouched and still populate their own runtime-profile sections as
    before.
    
    ## Implementation Details
    
    ### 1. `SummaryProfile`
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java`
    - Added the constant `ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics"`.
    - Inserted the key into `EXECUTION_SUMMARY_KEYS` (with indentation level
    3) so the runtime profile tree displays it under the scheduling block
    when present.
    - No default text is shown unless metrics are actually reported; the
    entry stays `N/A` otherwise.
    
    ### 2. `IcebergMetricsReporter`
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java`
    - Implements `org.apache.iceberg.metrics.MetricsReporter`.
    - On each `ScanReport`, it retrieves the current `SummaryProfile` from
    `ConnectContext`, grabs the execution summary `RuntimeProfile`, and
    appends a human-readable string for that scan.
    - Metrics covered:
      - Table, snapshot ID, sanitized filter text, projected column list.
      - Planning time (pretty-printed duration plus operation count).
      - Result/skipped data and delete file counts.
      - Total file size / total delete file size (in readable units).
      - Manifest counts (scanned/skipped for data and delete manifests).
      - Indexed/equality/positional delete file counters.
    - Selected metadata keys (currently `scan-state`, `scan-id` if present).
    - When multiple Iceberg scans run in one query, each scan’s line is
    appended on a new line under the same summary key.
    
    ### 3. `IcebergScanNode`
    
    
    
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java`
    - Replaces `icebergTable.newScan()` with
    `icebergTable.newScan().metricsReporter(new IcebergMetricsReporter())`
    inside `createTableScan()`.
    - This keeps catalog properties untouched and leverages Iceberg’s
    per-scan API to attach reporters.
    
    ## Example Profile Output
    
    After running a query against `iceberg_docker.test_db.ts_identity`, the
    FE profile shows:
    
    ```
      Iceberg Scan Metrics:
        Table Scan (iceberg_docker.test_db.ts_identity):
           - table: iceberg_docker.test_db.ts_identity
           - snapshot: 6315378011972705169
           - filter: true
           - columns: id|ts
           - planning: 7ms (1 ops)
           - data_files: 3
           - delete_files: 0
           - skipped_data_files: 0
           - skipped_delete_files: 0
           - total_size: 1.892 KB
           - total_delete_size: 0.000
           - scanned_manifests: 1
           - skipped_manifests: 0
           - scanned_delete_manifests: 0
           - skipped_delete_manifests: 0
           - indexed_delete_files: 0
           - equality_delete_files: 0
           - positional_delete_files: 0
    ```
---
 .../doris/common/profile/SummaryProfile.java       |   3 +
 .../iceberg/profile/IcebergMetricsReporter.java    | 167 +++++++++++++++++++++
 .../datasource/iceberg/source/IcebergScanNode.java |   4 +-
 3 files changed, 172 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index da86a3cef55..004f2042c32 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -132,6 +132,7 @@ public class SummaryProfile {
     public static final String RPC_WORK_TIME = "RPC Work Time";
     public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To 
FE";
     public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment 
Weight";
+    public static final String ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics";
 
     // These info will display on FE's web ui table, every one will be 
displayed as
     // a column, so that should not
@@ -164,6 +165,7 @@ public class SummaryProfile {
             GET_PARTITION_FILES_TIME,
             SINK_SET_PARTITION_VALUES_TIME,
             CREATE_SCAN_RANGE_TIME,
+            ICEBERG_SCAN_METRICS,
             NEREIDS_DISTRIBUTE_TIME,
             GET_META_VERSION_TIME,
             GET_PARTITION_VERSION_TIME,
@@ -215,6 +217,7 @@ public class SummaryProfile {
             .put(GET_PARTITION_FILES_TIME, 3)
             .put(SINK_SET_PARTITION_VALUES_TIME, 3)
             .put(CREATE_SCAN_RANGE_TIME, 2)
+            .put(ICEBERG_SCAN_METRICS, 3)
             .put(GET_PARTITION_VERSION_TIME, 1)
             .put(GET_PARTITION_VERSION_COUNT, 1)
             .put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
new file mode 100644
index 00000000000..47629424226
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.profile;
+
+import org.apache.doris.common.profile.RuntimeProfile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.TimerResult;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * MetricsReporter implementation that forwards Iceberg scan metrics into Doris
+ * profiles.
+ */
+public class IcebergMetricsReporter implements MetricsReporter {
+
+    private static final Pattern WHITESPACE = Pattern.compile("\\s+");
+
+    @Override
+    public void report(MetricsReport report) {
+        if (!(report instanceof ScanReport)) {
+            return;
+        }
+
+        SummaryProfile summaryProfile = 
SummaryProfile.getSummaryProfile(ConnectContext.get());
+        if (summaryProfile == null) {
+            return;
+        }
+
+        RuntimeProfile executionSummary = summaryProfile.getExecutionSummary();
+        if (executionSummary == null) {
+            return;
+        }
+
+        ScanReport scanReport = (ScanReport) report;
+        ScanMetricsResult metrics = scanReport.scanMetrics();
+        if (metrics == null) {
+            return;
+        }
+
+        RuntimeProfile icebergGroup = 
executionSummary.getChildMap().get(SummaryProfile.ICEBERG_SCAN_METRICS);
+        if (icebergGroup == null) {
+            icebergGroup = new 
RuntimeProfile(SummaryProfile.ICEBERG_SCAN_METRICS);
+            executionSummary.addChild(icebergGroup, true);
+        }
+
+        RuntimeProfile scanProfile = new 
RuntimeProfile(buildScanProfileName(scanReport));
+        appendScanDetails(scanProfile, scanReport, metrics);
+        icebergGroup.addChild(scanProfile, true);
+    }
+
+    private String sanitize(String value) {
+        if (Strings.isNullOrEmpty(value)) {
+            return "";
+        }
+        return WHITESPACE.matcher(value).replaceAll(" ").trim();
+    }
+
+    private String buildScanProfileName(ScanReport report) {
+        return "Table Scan (" + report.tableName() + ")";
+    }
+
+    private void appendScanDetails(RuntimeProfile scanProfile, ScanReport 
report, ScanMetricsResult metrics) {
+        scanProfile.addInfoString("table", report.tableName());
+        scanProfile.addInfoString("snapshot", 
String.valueOf(report.snapshotId()));
+        String filter = sanitize(report.filter() == null ? null : 
report.filter().toString());
+        if (!Strings.isNullOrEmpty(filter)) {
+            scanProfile.addInfoString("filter", filter);
+        }
+        if (!report.projectedFieldNames().isEmpty()) {
+            scanProfile.addInfoString("columns", 
Joiner.on('|').join(report.projectedFieldNames()));
+        }
+
+        appendTimer(scanProfile, "planning", metrics.totalPlanningDuration());
+        appendCounter(scanProfile, "data_files", metrics.resultDataFiles());
+        appendCounter(scanProfile, "delete_files", 
metrics.resultDeleteFiles());
+        appendCounter(scanProfile, "skipped_data_files", 
metrics.skippedDataFiles());
+        appendCounter(scanProfile, "skipped_delete_files", 
metrics.skippedDeleteFiles());
+        appendCounter(scanProfile, "total_size", 
metrics.totalFileSizeInBytes());
+        appendCounter(scanProfile, "total_delete_size", 
metrics.totalDeleteFileSizeInBytes());
+        appendCounter(scanProfile, "scanned_manifests", 
metrics.scannedDataManifests());
+        appendCounter(scanProfile, "skipped_manifests", 
metrics.skippedDataManifests());
+        appendCounter(scanProfile, "scanned_delete_manifests", 
metrics.scannedDeleteManifests());
+        appendCounter(scanProfile, "skipped_delete_manifests", 
metrics.skippedDeleteManifests());
+        appendCounter(scanProfile, "indexed_delete_files", 
metrics.indexedDeleteFiles());
+        appendCounter(scanProfile, "equality_delete_files", 
metrics.equalityDeleteFiles());
+        appendCounter(scanProfile, "positional_delete_files", 
metrics.positionalDeleteFiles());
+
+        appendMetadata(scanProfile, report.metadata());
+    }
+
+    private void appendMetadata(RuntimeProfile scanProfile, Map<String, 
String> metadata) {
+        if (metadata == null || metadata.isEmpty()) {
+            return;
+        }
+        List<String> importantKeys = ImmutableList.of("scan-state", "scan-id");
+        List<String> captured = new ArrayList<>();
+        for (String key : importantKeys) {
+            if (metadata.containsKey(key)) {
+                captured.add(key + "=" + metadata.get(key));
+            }
+        }
+        if (!captured.isEmpty()) {
+            scanProfile.addInfoString("metadata", "{" + String.join(", ", 
captured) + "}");
+        }
+    }
+
+    private void appendTimer(RuntimeProfile scanProfile, String name, 
TimerResult timerResult) {
+        if (timerResult == null) {
+            return;
+        }
+        scanProfile.addInfoString(name, formatTimer(timerResult));
+    }
+
+    private void appendCounter(RuntimeProfile scanProfile, String name, 
CounterResult counterResult) {
+        if (counterResult == null) {
+            return;
+        }
+        scanProfile.addInfoString(name, formatCounter(counterResult));
+    }
+
+    private String formatCounter(CounterResult counterResult) {
+        long value = counterResult.value();
+        if (counterResult.unit() == MetricsContext.Unit.BYTES) {
+            return DebugUtil.printByteWithUnit(value);
+        }
+        return Long.toString(value);
+    }
+
+    private String formatTimer(TimerResult timerResult) {
+        Duration duration = timerResult.totalDuration();
+        long millis = duration.toMillis();
+        String pretty = DebugUtil.getPrettyStringMs(millis);
+        return pretty + " (" + timerResult.count() + " ops)";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 61b1bf9c8ab..1acafbde5df 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -38,6 +38,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.planner.PlanNodeId;
@@ -327,7 +328,7 @@ public class IcebergScanNode extends FileQueryScanNode {
             return icebergTableScan;
         }
 
-        TableScan scan = icebergTable.newScan();
+        TableScan scan = icebergTable.newScan().metricsReporter(new 
IcebergMetricsReporter());
 
         // set snapshot
         IcebergTableQueryInfo info = getSpecifiedSnapshot();
@@ -676,4 +677,3 @@ public class IcebergScanNode extends FileQueryScanNode {
         return Optional.empty();
     }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to