This is an automated email from the ASF dual-hosted git repository.
dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new d36e88e6f feat(metrics): Evolve PolarisMetricsReporter interface with
timestamp parameter and comprehensive documentation (#3468)
d36e88e6f is described below
commit d36e88e6fcff2137eff26f5e3bf2c0632bef3d7b
Author: Anand K Sankaran <[email protected]>
AuthorDate: Fri Jan 23 08:11:52 2026 -0800
feat(metrics): Evolve PolarisMetricsReporter interface with timestamp
parameter and comprehensive documentation (#3468)
Enhance the `PolarisMetricsReporter` SPI interface by adding a timestamp
parameter to the `reportMetric()` method, enabling accurate time-series metrics
reporting to external systems.
---
CHANGELOG.md | 1 +
.../catalog/iceberg/IcebergCatalogAdapter.java | 9 +++--
.../service/reporting/DefaultMetricsReporter.java | 38 ++++++++++++++++++----
.../service/reporting/PolarisMetricsReporter.java | 32 +++++++++++++++++-
.../reporting/DefaultMetricsReporterTest.java | 10 +++---
.../org/apache/polaris/service/TestServices.java | 3 +-
6 files changed, 78 insertions(+), 15 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8aa4ed3dc..1eaec3be4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -34,6 +34,7 @@ request adding CHANGELOG notes for breaking (!) changes and
possibly other secti
### Breaking changes
- The (Before/After)CommitTableEvent has been removed.
+- The `PolarisMetricsReporter.reportMetric()` method signature has been
extended to include a `receivedTimestamp` parameter of type `java.time.Instant`.
### New Features
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 6c30afb9e..fb54b5572 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -30,6 +30,7 @@ import jakarta.inject.Inject;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
import java.util.EnumSet;
import java.util.Optional;
import java.util.function.Function;
@@ -105,6 +106,7 @@ public class IcebergCatalogAdapter
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
private final StorageAccessConfigProvider storageAccessConfigProvider;
private final PolarisMetricsReporter metricsReporter;
+ private final Clock clock;
@Inject
public IcebergCatalogAdapter(
@@ -122,7 +124,8 @@ public class IcebergCatalogAdapter
CatalogHandlerUtils catalogHandlerUtils,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories,
StorageAccessConfigProvider storageAccessConfigProvider,
- PolarisMetricsReporter metricsReporter) {
+ PolarisMetricsReporter metricsReporter,
+ Clock clock) {
this.diagnostics = diagnostics;
this.realmContext = realmContext;
this.callContext = callContext;
@@ -139,6 +142,7 @@ public class IcebergCatalogAdapter
this.externalCatalogFactories = externalCatalogFactories;
this.storageAccessConfigProvider = storageAccessConfigProvider;
this.metricsReporter = metricsReporter;
+ this.clock = clock;
}
/**
@@ -722,7 +726,8 @@ public class IcebergCatalogAdapter
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns,
RESTUtil.decodeString(table));
- metricsReporter.reportMetric(catalogName, tableIdentifier,
reportMetricsRequest.report());
+ metricsReporter.reportMetric(
+ catalogName, tableIdentifier, reportMetricsRequest.report(),
clock.instant());
return Response.status(Response.Status.NO_CONTENT).build();
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
index 5c7b4934a..eaff0219b 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/DefaultMetricsReporter.java
@@ -21,32 +21,56 @@ package org.apache.polaris.service.reporting;
import com.google.common.annotations.VisibleForTesting;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
-import org.apache.commons.lang3.function.TriConsumer;
+import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Default implementation of {@link PolarisMetricsReporter} that logs metrics
to the configured
+ * logger.
+ *
+ * <p>This implementation is selected when {@code
polaris.iceberg-metrics.reporting.type} is set to
+ * {@code "default"} (the default value).
+ *
+ * <p>By default, logging is disabled. To enable metrics logging, set the
logger level for {@code
+ * org.apache.polaris.service.reporting} to {@code INFO} in your logging
configuration.
+ *
+ * @see PolarisMetricsReporter
+ */
@ApplicationScoped
@Identifier("default")
public class DefaultMetricsReporter implements PolarisMetricsReporter {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultMetricsReporter.class);
- private final TriConsumer<String, TableIdentifier, MetricsReport>
reportConsumer;
+ private final QuadConsumer<String, TableIdentifier, MetricsReport, Instant>
reportConsumer;
+
+ /** Functional interface for consuming metrics reports with timestamp. */
+ @FunctionalInterface
+ interface QuadConsumer<T1, T2, T3, T4> {
+ void accept(T1 t1, T2 t2, T3 t3, T4 t4);
+ }
+ /** Creates a new DefaultMetricsReporter that logs metrics to the class
logger. */
public DefaultMetricsReporter() {
this(
- (catalogName, table, metricsReport) ->
- LOGGER.info("{}.{}: {}", catalogName, table, metricsReport));
+ (catalogName, table, metricsReport, receivedTimestamp) ->
+ LOGGER.info("{}.{} (ts={}): {}", catalogName, table,
receivedTimestamp, metricsReport));
}
@VisibleForTesting
- DefaultMetricsReporter(TriConsumer<String, TableIdentifier, MetricsReport>
reportConsumer) {
+ DefaultMetricsReporter(
+ QuadConsumer<String, TableIdentifier, MetricsReport, Instant>
reportConsumer) {
this.reportConsumer = reportConsumer;
}
@Override
- public void reportMetric(String catalogName, TableIdentifier table,
MetricsReport metricsReport) {
- reportConsumer.accept(catalogName, table, metricsReport);
+ public void reportMetric(
+ String catalogName,
+ TableIdentifier table,
+ MetricsReport metricsReport,
+ Instant receivedTimestamp) {
+ reportConsumer.accept(catalogName, table, metricsReport,
receivedTimestamp);
}
}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
index 7ffd84f4d..b27184d55 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/reporting/PolarisMetricsReporter.java
@@ -18,9 +18,39 @@
*/
package org.apache.polaris.service.reporting;
+import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
+/**
+ * SPI interface for reporting Iceberg metrics received by Polaris.
+ *
+ * <p>Implementations can be used to send metrics to external systems for
analysis and monitoring.
+ * Custom implementations can be annotated with appropriate {@code Quarkus}
scope and {@link
+ * io.smallrye.common.annotation.Identifier @Identifier("my-reporter-type")}
for CDI discovery.
+ *
+ * <p>The implementation to use is selected via the {@code
polaris.iceberg-metrics.reporting.type}
+ * configuration property, which defaults to {@code "default"}.
+ *
+ * <p>Implementations can inject other CDI beans for context.
+ *
+ * @see DefaultMetricsReporter
+ * @see MetricsReportingConfiguration
+ */
public interface PolarisMetricsReporter {
- public void reportMetric(String catalogName, TableIdentifier table,
MetricsReport metricsReport);
+
+ /**
+ * Reports an Iceberg metrics report for a specific table.
+ *
+ * @param catalogName the name of the catalog containing the table
+ * @param table the identifier of the table the metrics are for
+ * @param metricsReport the Iceberg metrics report (e.g., {@link
+ * org.apache.iceberg.metrics.ScanReport} or {@link
org.apache.iceberg.metrics.CommitReport})
+ * @param receivedTimestamp the timestamp when the metrics were received by
Polaris
+ */
+ void reportMetric(
+ String catalogName,
+ TableIdentifier table,
+ MetricsReport metricsReport,
+ Instant receivedTimestamp);
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
index dfdde0f3e..8762c3ed7 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/reporting/DefaultMetricsReporterTest.java
@@ -21,7 +21,7 @@ package org.apache.polaris.service.reporting;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import org.apache.commons.lang3.function.TriConsumer;
+import java.time.Instant;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
import org.junit.jupiter.api.Test;
@@ -31,14 +31,16 @@ public class DefaultMetricsReporterTest {
@Test
void testLogging() {
@SuppressWarnings("unchecked")
- TriConsumer<String, TableIdentifier, MetricsReport> mockConsumer =
mock(TriConsumer.class);
+ DefaultMetricsReporter.QuadConsumer<String, TableIdentifier,
MetricsReport, Instant>
+ mockConsumer = mock(DefaultMetricsReporter.QuadConsumer.class);
DefaultMetricsReporter reporter = new DefaultMetricsReporter(mockConsumer);
String warehouse = "testWarehouse";
TableIdentifier table = TableIdentifier.of("testNamespace", "testTable");
MetricsReport metricsReport = mock(MetricsReport.class);
+ Instant receivedTimestamp = Instant.ofEpochMilli(1234567890L);
- reporter.reportMetric(warehouse, table, metricsReport);
+ reporter.reportMetric(warehouse, table, metricsReport, receivedTimestamp);
- verify(mockConsumer).accept(warehouse, table, metricsReport);
+ verify(mockConsumer).accept(warehouse, table, metricsReport,
receivedTimestamp);
}
}
diff --git
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 59af4b5a6..30303121e 100644
---
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -350,7 +350,8 @@ public record TestServices(
catalogHandlerUtils,
externalCatalogFactory,
storageAccessConfigProvider,
- new DefaultMetricsReporter());
+ new DefaultMetricsReporter(),
+ Clock.systemUTC());
// Optionally wrap with event delegator
IcebergRestCatalogApiService finalRestCatalogService = catalogService;