flyrain commented on code in PR #3385:
URL: https://github.com/apache/polaris/pull/3385#discussion_r2832010250


##########
runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapMetricsCommand.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.admintool;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.polaris.core.persistence.metrics.MetricsSchemaBootstrap;
+import picocli.CommandLine;
+
+/**
+ * CLI command to bootstrap the metrics schema independently from the entity 
schema.
+ *
+ * <p>This command allows operators to add metrics persistence support to an 
existing Polaris
+ * deployment without re-bootstrapping the entity schema. It supports both 
fresh installation and
+ * upgrading from an older schema version to a newer one.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * polaris-admin bootstrap-metrics -r my-realm
+ * polaris-admin bootstrap-metrics -r realm1 -r realm2
+ * polaris-admin bootstrap-metrics -r my-realm --version 2

Review Comment:
   How does this work with this option?
   ```
        @CommandLine.Option(
             names = {"--include-metrics"},
             description = "Include metrics schema tables during bootstrap.")
         boolean includeMetrics;
   ```



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsSchemaBootstrap.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.metrics;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Service Provider Interface (SPI) for bootstrapping the metrics schema.
+ *
+ * <p>This interface enables different persistence backends (JDBC, NoSQL, 
custom) to implement
+ * metrics schema initialization in a way appropriate for their storage model. 
The metrics schema is
+ * separate from the entity schema and can be bootstrapped independently.
+ *
+ * <p>Implementations should be idempotent - calling {@link 
#bootstrap(String)} multiple times on
+ * the same realm should have no effect after the first successful call.
+ *
+ * <h3>Dependency Injection</h3>
+ *
+ * <p>This interface is designed to be injected via CDI (Contexts and 
Dependency Injection). The
+ * deployment module should provide a {@code @Produces} method that creates 
the appropriate
+ * implementation based on the configured persistence backend.
+ *
+ * <h3>Usage</h3>
+ *
+ * <p>The metrics schema can be bootstrapped:
+ *
+ * <ul>
+ *   <li>During initial realm bootstrap with the {@code --include-metrics} flag
+ *   <li>Independently via the {@code bootstrap-metrics} CLI command
+ *   <li>Programmatically by injecting this interface and calling {@link 
#bootstrap(String)}
+ * </ul>
+ *
+ * <p><b>Note:</b> This SPI is currently experimental. The API may change in 
future releases.
+ *
+ * @see MetricsPersistence
+ */
+@Beta
+public interface MetricsSchemaBootstrap {

Review Comment:
   I'd suggest to follow the same structure as `metastoreFactory`? In that 
case, this interface isn't needed. Bootstrap method will be part of 
`MetricsMetastoreFactory`. 



##########
polaris-core/src/main/java/org/apache/polaris/core/persistence/metrics/MetricsSchemaBootstrap.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence.metrics;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Service Provider Interface (SPI) for bootstrapping the metrics schema.
+ *
+ * <p>This interface enables different persistence backends (JDBC, NoSQL, 
custom) to implement
+ * metrics schema initialization in a way appropriate for their storage model. 
The metrics schema is
+ * separate from the entity schema and can be bootstrapped independently.
+ *
+ * <p>Implementations should be idempotent - calling {@link 
#bootstrap(String)} multiple times on
+ * the same realm should have no effect after the first successful call.
+ *
+ * <h3>Dependency Injection</h3>
+ *
+ * <p>This interface is designed to be injected via CDI (Contexts and 
Dependency Injection). The
+ * deployment module should provide a {@code @Produces} method that creates 
the appropriate
+ * implementation based on the configured persistence backend.
+ *
+ * <h3>Usage</h3>
+ *
+ * <p>The metrics schema can be bootstrapped:
+ *
+ * <ul>
+ *   <li>During initial realm bootstrap with the {@code --include-metrics} flag
+ *   <li>Independently via the {@code bootstrap-metrics} CLI command
+ *   <li>Programmatically by injecting this interface and calling {@link 
#bootstrap(String)}
+ * </ul>
+ *
+ * <p><b>Note:</b> This SPI is currently experimental. The API may change in 
future releases.
+ *
+ * @see MetricsPersistence
+ */
+@Beta
+public interface MetricsSchemaBootstrap {
+
+  /**
+   * A no-op implementation for backends that don't support metrics schema 
bootstrap.
+   *
+   * <p>This implementation always reports the schema as bootstrapped and does 
nothing when {@link
+   * #bootstrap(String)} is called.
+   */
+  MetricsSchemaBootstrap NOOP =

Review Comment:
   This is not needed anymore.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetricsPersistence.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc;
+
+import static 
org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
+import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
+import org.apache.polaris.core.persistence.metrics.MetricsQueryCriteria;
+import org.apache.polaris.core.persistence.metrics.ReportIdToken;
+import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.apache.polaris.core.persistence.pagination.Token;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReportConverter;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReportConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JDBC implementation of {@link MetricsPersistence}.
+ *
+ * <p>This class provides direct JDBC persistence for metrics reports, 
converting between SPI record
+ * types ({@link ScanMetricsRecord}, {@link CommitMetricsRecord}) and JDBC 
model types ({@link
+ * ModelScanMetricsReport}, {@link ModelCommitMetricsReport}).
+ *
+ * <p>This implementation assumes that metrics tables exist. The producer 
({@link
+ * JdbcMetricsPersistenceProducer}) checks for metrics table availability at 
startup and returns
+ * {@link MetricsPersistence#NOOP} if tables are not present.
+ */
+public class JdbcMetricsPersistence implements MetricsPersistence {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcMetricsPersistence.class);
+
+  private final DatasourceOperations datasourceOperations;
+  private final String realmId;
+
+  /**
+   * Creates a new JdbcMetricsPersistence instance.
+   *
+   * @param datasourceOperations the datasource operations for JDBC access
+   * @param realmId the realm ID for multi-tenancy
+   */
+  public JdbcMetricsPersistence(DatasourceOperations datasourceOperations, 
String realmId) {
+    this.datasourceOperations = datasourceOperations;
+    this.realmId = realmId;
+  }
+
+  @Override
+  public void writeScanReport(@Nonnull ScanMetricsRecord record) {
+    ModelScanMetricsReport model = SpiModelConverter.toModelScanReport(record, 
realmId);

Review Comment:
   How do we pass fields like `principal_name`, `request_id`,  `otel_trace_id`, 
`otel_span_id`, and `report_trace_id`?



##########
runtime/admin/src/main/java/org/apache/polaris/admintool/BootstrapMetricsCommand.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.admintool;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.polaris.core.persistence.metrics.MetricsSchemaBootstrap;
+import picocli.CommandLine;
+
+/**
+ * CLI command to bootstrap the metrics schema independently from the entity 
schema.
+ *
+ * <p>This command allows operators to add metrics persistence support to an 
existing Polaris
+ * deployment without re-bootstrapping the entity schema. It supports both 
fresh installation and
+ * upgrading from an older schema version to a newer one.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * polaris-admin bootstrap-metrics -r my-realm
+ * polaris-admin bootstrap-metrics -r realm1 -r realm2
+ * polaris-admin bootstrap-metrics -r my-realm --version 2
+ * }</pre>
+ */
[email protected](
+    name = "bootstrap-metrics",
+    mixinStandardHelpOptions = true,
+    description = "Bootstraps or upgrades the metrics schema for existing 
realms.")
+public class BootstrapMetricsCommand extends BaseCommand {
+
+  @Inject
+  @Identifier("relational-jdbc")

Review Comment:
   Does this mean only jdbc work for this bootstrap command? 
   
   In the class BaseMetaStoreCommand, we didn't specify the identifier
   ```
    @Inject protected MetaStoreManagerFactory metaStoreManagerFactory
   ```



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelScanMetricsReportConverter.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+/**
+ * Converter for reading ModelScanMetricsReport from database result sets. 
This class is needed
+ * because the Immutables-generated class cannot be instantiated without 
required fields.
+ */
+public class ModelScanMetricsReportConverter implements 
Converter<ModelScanMetricsReport> {

Review Comment:
   Same here



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelCommitMetricsReportConverter.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc.models;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import org.apache.polaris.persistence.relational.jdbc.DatabaseType;
+
+/**
+ * Converter for reading ModelCommitMetricsReport from database result sets. 
This class is needed
+ * because the Immutables-generated class cannot be instantiated without 
required fields.
+ */
+public class ModelCommitMetricsReportConverter implements 
Converter<ModelCommitMetricsReport> {

Review Comment:
   I think this class is not needed as we've have a dummy Instance of 
`ModelCommitMetricsReport` named CONVERTER, which can be used to invoke the 
default method `fromResultSet()`



##########
runtime/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.reporting;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.metrics.iceberg.MetricsRecordConverter;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
+import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
+import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link PolarisMetricsReporter} that persists metrics to 
the configured {@link
+ * MetricsPersistence} backend.
+ *
+ * <p>This reporter is selected when {@code 
polaris.iceberg-metrics.reporting.type} is set to {@code
+ * "persisting"}.
+ *
+ * <p>The reporter looks up the catalog entity by name to obtain the catalog 
ID, then uses {@link
+ * MetricsRecordConverter} to convert Iceberg metrics reports to SPI records 
before persisting them.
+ *
+ * @see PolarisMetricsReporter
+ * @see MetricsPersistence
+ * @see MetricsRecordConverter
+ */
+@RequestScoped
+@Identifier("persisting")
+public class PersistingMetricsReporter implements PolarisMetricsReporter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PersistingMetricsReporter.class);
+
+  private final CallContext callContext;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final MetricsPersistence metricsPersistence;
+
+  @Inject
+  public PersistingMetricsReporter(
+      CallContext callContext,
+      PolarisMetaStoreManager metaStoreManager,
+      MetricsPersistence metricsPersistence) {
+    this.callContext = callContext;
+    this.metaStoreManager = metaStoreManager;
+    this.metricsPersistence = metricsPersistence;
+  }
+
+  @Override
+  public void reportMetric(
+      String catalogName,
+      TableIdentifier table,
+      MetricsReport metricsReport,
+      Instant receivedTimestamp) {
+
+    // Look up the catalog entity to get the catalog ID
+    EntityResult catalogResult =

Review Comment:
   Another solution is to pass all needed information from the 
caller(`IcebergCatalogHanlder`), so that we don't even need a metaStoreManager 
here.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetricsPersistence.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.relational.jdbc;
+
+import static 
org.apache.polaris.persistence.relational.jdbc.QueryGenerator.PreparedQuery;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
+import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
+import org.apache.polaris.core.persistence.metrics.MetricsQueryCriteria;
+import org.apache.polaris.core.persistence.metrics.ReportIdToken;
+import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
+import org.apache.polaris.core.persistence.pagination.Page;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.apache.polaris.core.persistence.pagination.Token;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReportConverter;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport;
+import 
org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReportConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JDBC implementation of {@link MetricsPersistence}.
+ *
+ * <p>This class provides direct JDBC persistence for metrics reports, 
converting between SPI record
+ * types ({@link ScanMetricsRecord}, {@link CommitMetricsRecord}) and JDBC 
model types ({@link
+ * ModelScanMetricsReport}, {@link ModelCommitMetricsReport}).
+ *
+ * <p>This implementation assumes that metrics tables exist. The producer 
({@link
+ * JdbcMetricsPersistenceProducer}) checks for metrics table availability at 
startup and returns
+ * {@link MetricsPersistence#NOOP} if tables are not present.
+ */
+public class JdbcMetricsPersistence implements MetricsPersistence {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcMetricsPersistence.class);
+
+  private final DatasourceOperations datasourceOperations;
+  private final String realmId;
+
+  /**
+   * Creates a new JdbcMetricsPersistence instance.
+   *
+   * @param datasourceOperations the datasource operations for JDBC access
+   * @param realmId the realm ID for multi-tenancy
+   */
+  public JdbcMetricsPersistence(DatasourceOperations datasourceOperations, 
String realmId) {
+    this.datasourceOperations = datasourceOperations;
+    this.realmId = realmId;
+  }
+
+  @Override
+  public void writeScanReport(@Nonnull ScanMetricsRecord record) {
+    ModelScanMetricsReport model = SpiModelConverter.toModelScanReport(record, 
realmId);
+    writeScanMetricsReport(model);
+  }
+
+  @Override
+  public void writeCommitReport(@Nonnull CommitMetricsRecord record) {
+    ModelCommitMetricsReport model = 
SpiModelConverter.toModelCommitReport(record, realmId);
+    writeCommitMetricsReport(model);
+  }
+
+  @Override
+  @Nonnull
+  public Page<ScanMetricsRecord> queryScanReports(
+      @Nonnull MetricsQueryCriteria criteria, @Nonnull PageToken pageToken) {
+    // catalogId and tableId are required for queries
+    if (criteria.catalogId().isEmpty() || criteria.tableId().isEmpty()) {
+      return Page.fromItems(List.of());
+    }
+
+    int limit = pageToken.pageSize().orElse(100);
+    Long startTimeMs = criteria.startTime().map(t -> 
t.toEpochMilli()).orElse(null);
+    Long endTimeMs = criteria.endTime().map(t -> 
t.toEpochMilli()).orElse(null);
+
+    // Extract cursor from page token if present
+    String lastReportId =
+        
pageToken.valueAs(ReportIdToken.class).map(ReportIdToken::reportId).orElse(null);

Review Comment:
   The report id is an UUID, how does pagination work in that case? 



##########
runtime/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.reporting;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.metrics.iceberg.MetricsRecordConverter;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.metrics.CommitMetricsRecord;
+import org.apache.polaris.core.persistence.metrics.MetricsPersistence;
+import org.apache.polaris.core.persistence.metrics.ScanMetricsRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link PolarisMetricsReporter} that persists metrics to 
the configured {@link
+ * MetricsPersistence} backend.
+ *
+ * <p>This reporter is selected when {@code 
polaris.iceberg-metrics.reporting.type} is set to {@code
+ * "persisting"}.
+ *
+ * <p>The reporter looks up the catalog entity by name to obtain the catalog 
ID, then uses {@link
+ * MetricsRecordConverter} to convert Iceberg metrics reports to SPI records 
before persisting them.
+ *
+ * @see PolarisMetricsReporter
+ * @see MetricsPersistence
+ * @see MetricsRecordConverter
+ */
+@RequestScoped
+@Identifier("persisting")
+public class PersistingMetricsReporter implements PolarisMetricsReporter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PersistingMetricsReporter.class);
+
+  private final CallContext callContext;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final MetricsPersistence metricsPersistence;
+
+  @Inject
+  public PersistingMetricsReporter(
+      CallContext callContext,
+      PolarisMetaStoreManager metaStoreManager,
+      MetricsPersistence metricsPersistence) {
+    this.callContext = callContext;
+    this.metaStoreManager = metaStoreManager;
+    this.metricsPersistence = metricsPersistence;
+  }
+
+  @Override
+  public void reportMetric(
+      String catalogName,
+      TableIdentifier table,
+      MetricsReport metricsReport,
+      Instant receivedTimestamp) {
+
+    // Look up the catalog entity to get the catalog ID
+    EntityResult catalogResult =

Review Comment:
   For each metrics event, there are 1 catalog lookup + 1 per namespace level + 
1 table lookup, all sequential. I think we should avoid that by using resolver 
to resolve the whole path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to