This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 20b2179bff Core: Make metrics reporting asynchronous (#13507)
20b2179bff is described below
commit 20b2179bff881a3f2c3ebb3fbc850fdf87cce80f
Author: Anoop Johnson <[email protected]>
AuthorDate: Fri Jul 11 12:29:35 2025 -0700
Core: Make metrics reporting asynchronous (#13507)
---
.../apache/iceberg/rest/RESTMetricsReporter.java | 32 +++++++++++++++-------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java
b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java
index bb3f6e683b..4658076e91 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTMetricsReporter.java
@@ -19,10 +19,13 @@
package org.apache.iceberg.rest;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +36,9 @@ import org.slf4j.LoggerFactory;
class RESTMetricsReporter implements MetricsReporter {
private static final Logger LOG =
LoggerFactory.getLogger(RESTMetricsReporter.class);
+ private static final ExecutorService METRICS_EXECUTOR =
+ ThreadPools.newExitingWorkerPool("rest-metrics-reporter", 1);
+
private final RESTClient client;
private final String metricsEndpoint;
private final Supplier<Map<String, String>> headers;
@@ -51,15 +57,21 @@ class RESTMetricsReporter implements MetricsReporter {
return;
}
- try {
- client.post(
- metricsEndpoint,
- ReportMetricsRequest.of(report),
- null,
- headers,
- ErrorHandlers.defaultErrorHandler());
- } catch (Exception e) {
- LOG.warn("Failed to report metrics to REST endpoint {}",
metricsEndpoint, e);
- }
+ Tasks.range(1)
+ .executeWith(METRICS_EXECUTOR)
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (item, exception) ->
+ LOG.warn(
+ "Failed to report metrics to REST endpoint {}",
metricsEndpoint, exception))
+ .run(
+ item -> {
+ client.post(
+ metricsEndpoint,
+ ReportMetricsRequest.of(report),
+ null,
+ headers,
+ ErrorHandlers.defaultErrorHandler());
+ });
}
}