This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b9a05c843db [enhance](iceberg) Doris Iceberg Scan Metrics Integration
(#59010)
b9a05c843db is described below
commit b9a05c843dbd69362ade086fd743a4dd787e68be
Author: Socrates <[email protected]>
AuthorDate: Tue Dec 16 22:21:07 2025 +0800
[enhance](iceberg) Doris Iceberg Scan Metrics Integration (#59010)
### What problem does this PR solve?
## Overview
This change pipes Apache Iceberg scan metrics directly into Doris query
profiles so operators can inspect per-scan statistics (files, bytes,
manifests, filters, etc.) from the FE profile UI. The integration
consists of three pieces:
1. **Summary Profile Slot** – `SummaryProfile` now includes an `Iceberg
Scan Metrics` entry in the execution summary list so the FE profile
table reserves space for the Iceberg telemetry.
2. **Metrics Reporter** – a new `IcebergMetricsReporter` implementation
formats `ScanReport` data (planning time, file counters, size counters,
delete-file stats, projected columns, metadata) and appends it to the
summary entry whenever an Iceberg scan runs.
3. **Scan Integration** – `IcebergScanNode` calls
`table.newScan().metricsReporter(new IcebergMetricsReporter())`,
ensuring every Iceberg table scan emits the metrics without requiring
catalog-level configuration changes.
All metrics remain scoped to Iceberg scans; other table formats are
untouched and still populate their own runtime-profile sections as
before.
## Implementation Details
### 1. `SummaryProfile`
`fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java`
- Added the constant `ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics"`.
- Inserted the key into `EXECUTION_SUMMARY_KEYS` (with indentation level
3) so the runtime profile tree displays it under the scheduling block
when present.
- No default text is shown unless metrics are actually reported; the
entry stays `N/A` otherwise.
### 2. `IcebergMetricsReporter`
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java`
- Implements `org.apache.iceberg.metrics.MetricsReporter`.
- On each `ScanReport`, it retrieves the current `SummaryProfile` from
`ConnectContext`, grabs the execution summary `RuntimeProfile`, and
appends a human-readable string for that scan.
- Metrics covered:
- Table, snapshot ID, sanitized filter text, projected column list.
- Planning time (pretty-printed duration plus operation count).
- Result/skipped data and delete file counts.
- Total file size / total delete file size (in readable units).
- Manifest counts (scanned/skipped for data and delete manifests).
- Indexed/equality/positional delete file counters.
- Selected metadata keys (currently `scan-state`, `scan-id` if present).
- When multiple Iceberg scans run in one query, each scan’s line is
appended on a new line under the same summary key.
### 3. `IcebergScanNode`
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java`
- Replaces `icebergTable.newScan()` with
`icebergTable.newScan().metricsReporter(new IcebergMetricsReporter())`
inside `createTableScan()`.
- This keeps catalog properties untouched and leverages Iceberg’s
per-scan API to attach reporters.
## Example Profile Output
After running a query against `iceberg_docker.test_db.ts_identity`, the
FE profile shows:
```
Iceberg Scan Metrics:
Table Scan (iceberg_docker.test_db.ts_identity):
- table: iceberg_docker.test_db.ts_identity
- snapshot: 6315378011972705169
- filter: true
- columns: id|ts
- planning: 7ms (1 ops)
- data_files: 3
- delete_files: 0
- skipped_data_files: 0
- skipped_delete_files: 0
- total_size: 1.892 KB
- total_delete_size: 0.000
- scanned_manifests: 1
- skipped_manifests: 0
- scanned_delete_manifests: 0
- skipped_delete_manifests: 0
- indexed_delete_files: 0
- equality_delete_files: 0
- positional_delete_files: 0
```
---
.../doris/common/profile/SummaryProfile.java | 3 +
.../iceberg/profile/IcebergMetricsReporter.java | 167 +++++++++++++++++++++
.../datasource/iceberg/source/IcebergScanNode.java | 4 +-
3 files changed, 172 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index da86a3cef55..004f2042c32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -132,6 +132,7 @@ public class SummaryProfile {
public static final String RPC_WORK_TIME = "RPC Work Time";
public static final String LATENCY_FROM_BE_TO_FE = "RPC Latency From BE To
FE";
public static final String SPLITS_ASSIGNMENT_WEIGHT = "Splits Assignment
Weight";
+ public static final String ICEBERG_SCAN_METRICS = "Iceberg Scan Metrics";
// These info will display on FE's web ui table, every one will be
displayed as
// a column, so that should not
@@ -164,6 +165,7 @@ public class SummaryProfile {
GET_PARTITION_FILES_TIME,
SINK_SET_PARTITION_VALUES_TIME,
CREATE_SCAN_RANGE_TIME,
+ ICEBERG_SCAN_METRICS,
NEREIDS_DISTRIBUTE_TIME,
GET_META_VERSION_TIME,
GET_PARTITION_VERSION_TIME,
@@ -215,6 +217,7 @@ public class SummaryProfile {
.put(GET_PARTITION_FILES_TIME, 3)
.put(SINK_SET_PARTITION_VALUES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
+ .put(ICEBERG_SCAN_METRICS, 3)
.put(GET_PARTITION_VERSION_TIME, 1)
.put(GET_PARTITION_VERSION_COUNT, 1)
.put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
new file mode 100644
index 00000000000..47629424226
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/profile/IcebergMetricsReporter.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.profile;
+
+import org.apache.doris.common.profile.RuntimeProfile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.TimerResult;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * MetricsReporter implementation that forwards Iceberg scan metrics into Doris
+ * profiles.
+ */
+public class IcebergMetricsReporter implements MetricsReporter {
+
+ private static final Pattern WHITESPACE = Pattern.compile("\\s+");
+
+ @Override
+ public void report(MetricsReport report) {
+ if (!(report instanceof ScanReport)) {
+ return;
+ }
+
+ SummaryProfile summaryProfile =
SummaryProfile.getSummaryProfile(ConnectContext.get());
+ if (summaryProfile == null) {
+ return;
+ }
+
+ RuntimeProfile executionSummary = summaryProfile.getExecutionSummary();
+ if (executionSummary == null) {
+ return;
+ }
+
+ ScanReport scanReport = (ScanReport) report;
+ ScanMetricsResult metrics = scanReport.scanMetrics();
+ if (metrics == null) {
+ return;
+ }
+
+ RuntimeProfile icebergGroup =
executionSummary.getChildMap().get(SummaryProfile.ICEBERG_SCAN_METRICS);
+ if (icebergGroup == null) {
+ icebergGroup = new
RuntimeProfile(SummaryProfile.ICEBERG_SCAN_METRICS);
+ executionSummary.addChild(icebergGroup, true);
+ }
+
+ RuntimeProfile scanProfile = new
RuntimeProfile(buildScanProfileName(scanReport));
+ appendScanDetails(scanProfile, scanReport, metrics);
+ icebergGroup.addChild(scanProfile, true);
+ }
+
+ private String sanitize(String value) {
+ if (Strings.isNullOrEmpty(value)) {
+ return "";
+ }
+ return WHITESPACE.matcher(value).replaceAll(" ").trim();
+ }
+
+ private String buildScanProfileName(ScanReport report) {
+ return "Table Scan (" + report.tableName() + ")";
+ }
+
+ private void appendScanDetails(RuntimeProfile scanProfile, ScanReport
report, ScanMetricsResult metrics) {
+ scanProfile.addInfoString("table", report.tableName());
+ scanProfile.addInfoString("snapshot",
String.valueOf(report.snapshotId()));
+ String filter = sanitize(report.filter() == null ? null :
report.filter().toString());
+ if (!Strings.isNullOrEmpty(filter)) {
+ scanProfile.addInfoString("filter", filter);
+ }
+ if (!report.projectedFieldNames().isEmpty()) {
+ scanProfile.addInfoString("columns",
Joiner.on('|').join(report.projectedFieldNames()));
+ }
+
+ appendTimer(scanProfile, "planning", metrics.totalPlanningDuration());
+ appendCounter(scanProfile, "data_files", metrics.resultDataFiles());
+ appendCounter(scanProfile, "delete_files",
metrics.resultDeleteFiles());
+ appendCounter(scanProfile, "skipped_data_files",
metrics.skippedDataFiles());
+ appendCounter(scanProfile, "skipped_delete_files",
metrics.skippedDeleteFiles());
+ appendCounter(scanProfile, "total_size",
metrics.totalFileSizeInBytes());
+ appendCounter(scanProfile, "total_delete_size",
metrics.totalDeleteFileSizeInBytes());
+ appendCounter(scanProfile, "scanned_manifests",
metrics.scannedDataManifests());
+ appendCounter(scanProfile, "skipped_manifests",
metrics.skippedDataManifests());
+ appendCounter(scanProfile, "scanned_delete_manifests",
metrics.scannedDeleteManifests());
+ appendCounter(scanProfile, "skipped_delete_manifests",
metrics.skippedDeleteManifests());
+ appendCounter(scanProfile, "indexed_delete_files",
metrics.indexedDeleteFiles());
+ appendCounter(scanProfile, "equality_delete_files",
metrics.equalityDeleteFiles());
+ appendCounter(scanProfile, "positional_delete_files",
metrics.positionalDeleteFiles());
+
+ appendMetadata(scanProfile, report.metadata());
+ }
+
+ private void appendMetadata(RuntimeProfile scanProfile, Map<String,
String> metadata) {
+ if (metadata == null || metadata.isEmpty()) {
+ return;
+ }
+ List<String> importantKeys = ImmutableList.of("scan-state", "scan-id");
+ List<String> captured = new ArrayList<>();
+ for (String key : importantKeys) {
+ if (metadata.containsKey(key)) {
+ captured.add(key + "=" + metadata.get(key));
+ }
+ }
+ if (!captured.isEmpty()) {
+ scanProfile.addInfoString("metadata", "{" + String.join(", ",
captured) + "}");
+ }
+ }
+
+ private void appendTimer(RuntimeProfile scanProfile, String name,
TimerResult timerResult) {
+ if (timerResult == null) {
+ return;
+ }
+ scanProfile.addInfoString(name, formatTimer(timerResult));
+ }
+
+ private void appendCounter(RuntimeProfile scanProfile, String name,
CounterResult counterResult) {
+ if (counterResult == null) {
+ return;
+ }
+ scanProfile.addInfoString(name, formatCounter(counterResult));
+ }
+
+ private String formatCounter(CounterResult counterResult) {
+ long value = counterResult.value();
+ if (counterResult.unit() == MetricsContext.Unit.BYTES) {
+ return DebugUtil.printByteWithUnit(value);
+ }
+ return Long.toString(value);
+ }
+
+ private String formatTimer(TimerResult timerResult) {
+ Duration duration = timerResult.totalDuration();
+ long millis = duration.toMillis();
+ String pretty = DebugUtil.getPrettyStringMs(millis);
+ return pretty + " (" + timerResult.count() + " ops)";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 61b1bf9c8ab..1acafbde5df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -38,6 +38,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
@@ -327,7 +328,7 @@ public class IcebergScanNode extends FileQueryScanNode {
return icebergTableScan;
}
- TableScan scan = icebergTable.newScan();
+ TableScan scan = icebergTable.newScan().metricsReporter(new
IcebergMetricsReporter());
// set snapshot
IcebergTableQueryInfo info = getSpecifiedSnapshot();
@@ -676,4 +677,3 @@ public class IcebergScanNode extends FileQueryScanNode {
return Optional.empty();
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]