obelix74 commented on code in PR #3385: URL: https://github.com/apache/polaris/pull/3385#discussion_r2673593539
########## persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql: ########## @@ -0,0 +1,194 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +-- Changes from v3: +-- * Added `scan_metrics_report` table for scan metrics as first-class entities +-- * Added `commit_metrics_report` table for commit metrics as first-class entities + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +-- Include all tables from v3 +-- (entities, grant_records, principal_authentication_data, policy_mapping_record, events) +-- These are assumed to already exist from v3 migration + +-- Scan Metrics Report Entity Table +CREATE TABLE IF NOT EXISTS scan_metrics_report ( + report_id TEXT NOT NULL, + realm_id TEXT NOT NULL, + catalog_id TEXT NOT NULL, + catalog_name TEXT NOT NULL, + namespace TEXT NOT NULL, + table_name TEXT NOT NULL, + + -- Report metadata + timestamp_ms BIGINT NOT NULL, + principal_name TEXT, + request_id TEXT, + + -- Trace correlation + otel_trace_id TEXT, + otel_span_id TEXT, + report_trace_id TEXT, + + -- Scan context + snapshot_id BIGINT, + schema_id INTEGER, + filter_expression TEXT, + projected_field_ids TEXT, + projected_field_names TEXT, + + -- Scan metrics + result_data_files BIGINT DEFAULT 0, + result_delete_files BIGINT DEFAULT 0, + total_file_size_bytes BIGINT DEFAULT 0, + total_data_manifests BIGINT DEFAULT 0, + total_delete_manifests BIGINT DEFAULT 0, + scanned_data_manifests BIGINT DEFAULT 0, + scanned_delete_manifests BIGINT DEFAULT 0, + skipped_data_manifests BIGINT DEFAULT 0, + skipped_delete_manifests BIGINT DEFAULT 0, + skipped_data_files BIGINT DEFAULT 0, + skipped_delete_files BIGINT DEFAULT 0, + total_planning_duration_ms BIGINT DEFAULT 0, + + -- Equality/positional delete metrics + equality_delete_files BIGINT DEFAULT 0, + positional_delete_files BIGINT DEFAULT 0, + indexed_delete_files BIGINT DEFAULT 0, + total_delete_file_size_bytes BIGINT DEFAULT 0, + + -- Additional metadata (for extensibility) + metadata JSONB DEFAULT '{}'::JSONB, + + PRIMARY KEY (report_id) +); + +COMMENT ON TABLE scan_metrics_report IS 'Scan metrics reports as first-class entities'; +COMMENT ON COLUMN scan_metrics_report.report_id IS 'Unique identifier for the report'; +COMMENT ON COLUMN scan_metrics_report.realm_id IS 'Realm ID for multi-tenancy'; +COMMENT ON COLUMN scan_metrics_report.catalog_id IS 'Catalog ID'; +COMMENT ON COLUMN scan_metrics_report.otel_trace_id IS 'OpenTelemetry trace ID from HTTP headers'; +COMMENT ON COLUMN scan_metrics_report.report_trace_id IS 'Trace ID from report metadata'; + +-- Indexes for scan_metrics_report Review Comment: Index Strategy: Indexes are designed around common query patterns: time-range queries (timestamp_ms DESC), table-specific queries (composite on catalog/namespace/table), and trace correlation (otel_trace_id with partial index to skip NULLs). The realm_id index supports multi-tenant queries without requiring it in every composite index. ########## persistence/relational-jdbc/src/main/resources/postgres/schema-v4.sql: ########## @@ -0,0 +1,194 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"). You may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +-- Changes from v3: +-- * Added `scan_metrics_report` table for scan metrics as first-class entities +-- * Added `commit_metrics_report` table for commit metrics as first-class entities + +CREATE SCHEMA IF NOT EXISTS POLARIS_SCHEMA; +SET search_path TO POLARIS_SCHEMA; + +CREATE TABLE IF NOT EXISTS version ( + version_key TEXT PRIMARY KEY, + version_value INTEGER NOT NULL +); +INSERT INTO version (version_key, version_value) +VALUES ('version', 4) +ON CONFLICT (version_key) DO UPDATE +SET version_value = EXCLUDED.version_value; +COMMENT ON TABLE version IS 'the version of the JDBC schema in use'; + +-- Include all tables from v3 +-- (entities, grant_records, principal_authentication_data, policy_mapping_record, events) +-- These are assumed to already exist from v3 migration + +-- Scan Metrics Report Entity Table +CREATE TABLE IF NOT EXISTS scan_metrics_report ( Review Comment: Dual Storage Strategy: We use dedicated scan_metrics_report and commit_metrics_report tables rather than storing metrics as JSON blobs in the existing events table. This enables efficient SQL queries for analytics (e.g., "find slow table scans") and proper indexing on metrics fields, at the cost of a more rigid schema. The metadata JSONB column provides extensibility for future metric fields without requiring schema migrations. ########## runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java: ########## @@ -433,6 +436,57 @@ public void closeTaskExecutor(@Disposes @Identifier("task-executor") ManagedExec @ApplicationScoped public PolarisMetricsReporter metricsReporter( Review Comment: CDI-based Reporter Selection: The @Identifier annotation pattern allows new reporter implementations to be added without modifying this factory. The composite reporter is assembled dynamically from the targets config, with safeguards against infinite recursion (ignoring "composite" as a target) and graceful fallback to default if no valid targets are resolved. ########## runtime/service/src/main/java/org/apache/polaris/service/reporting/MetricsReportCleanupService.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.persistence.relational.jdbc.JdbcBasePersistenceImpl; +import org.apache.polaris.service.context.RealmContextConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scheduled service that cleans up old metrics reports based on the configured retention policy. + * + * <p>This service runs periodically and deletes metrics reports that are older than the configured + * retention period. It only operates when the persistence backend is relational-jdbc. + * + * <p>Configuration example: + * + * <pre> + * polaris: + * iceberg-metrics: + * reporting: + * type: persistence + * retention: + * enabled: true + * retention-period: P30D # 30 days + * cleanup-interval: PT6H # every 6 hours + * </pre> + */ +@ApplicationScoped +@Startup +public class MetricsReportCleanupService { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsReportCleanupService.class); + + private final MetricsReportingConfiguration config; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final RealmContextConfiguration realmContextConfiguration; + private final AtomicBoolean running = new AtomicBoolean(false); + + @Inject + public MetricsReportCleanupService( + MetricsReportingConfiguration config, + MetaStoreManagerFactory metaStoreManagerFactory, + RealmContextConfiguration realmContextConfiguration) { + this.config = config; + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.realmContextConfiguration = realmContextConfiguration; + + if (config.retention().enabled()) { + LOGGER.info( + "Metrics report cleanup enabled with retention period: {}, cleanup interval: {}", + config.retention().retentionPeriod(), + config.retention().cleanupInterval()); + } else { + LOGGER.debug("Metrics report cleanup is disabled"); + } + } + + /** + * Scheduled cleanup job that runs at the configured interval. The actual interval is configured + * via the retention.cleanup-interval property. + */ + @Scheduled(every = "${polaris.iceberg-metrics.reporting.retention.cleanup-interval:6h}") Review Comment: The cleanup interval is injected via config expression ${polaris.iceberg-metrics.reporting.retention.cleanup-interval:6h}. This runs in a Quarkus scheduler thread, not the request thread, so the realm context must be explicitly created (line 143) rather than injected. ########## runtime/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.quarkus.security.identity.SecurityIdentity; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import java.security.Principal; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.CommitReport; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.persistence.relational.jdbc.JdbcBasePersistenceImpl; +import org.apache.polaris.persistence.relational.jdbc.models.MetricsReportConverter; +import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport; +import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A metrics reporter that persists scan and commit reports as first-class entities in the database. + * This provides better queryability and analytics capabilities compared to storing metrics as + * generic events. + * + * <p>To enable this reporter, set the configuration: + * + * <pre> + * polaris: + * iceberg-metrics: + * reporting: + * type: persistence + * </pre> + * + * <p>Note: This reporter requires the relational-jdbc persistence backend. If a different + * persistence backend is configured, metrics will be logged but not persisted. + */ +@ApplicationScoped +@Identifier("persistence") +public class PersistingMetricsReporter implements PolarisMetricsReporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersistingMetricsReporter.class); + + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final RealmContext realmContext; + private final Instance<SecurityIdentity> securityIdentityInstance; + + @Inject + public PersistingMetricsReporter( + MetaStoreManagerFactory metaStoreManagerFactory, + RealmContext realmContext, + Instance<SecurityIdentity> securityIdentityInstance) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.realmContext = realmContext; + this.securityIdentityInstance = securityIdentityInstance; + } + + @Override + public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport) { + try { + String realmId = realmContext.getRealmIdentifier(); + String catalogId = catalogName; // Using catalog name as ID for now + String namespace = table.namespace().toString(); + + // Extract principal name from security context + String principalName = extractPrincipalName(); + String requestId = null; + + // Extract OpenTelemetry trace context from the current span Review Comment: Capturing the OpenTelemetry trace/span IDs enables correlating metrics reports with the HTTP request that triggered them. This is essential for debugging slow scans or failed commits by joining metrics data with distributed traces. Span.current() returns the active span from the thread-local context propagated through the Quarkus request handling. ########## runtime/service/src/main/java/org/apache/polaris/service/reporting/PersistingMetricsReporter.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.reporting; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.quarkus.security.identity.SecurityIdentity; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import java.security.Principal; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.CommitReport; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.persistence.relational.jdbc.JdbcBasePersistenceImpl; +import org.apache.polaris.persistence.relational.jdbc.models.MetricsReportConverter; +import org.apache.polaris.persistence.relational.jdbc.models.ModelCommitMetricsReport; +import org.apache.polaris.persistence.relational.jdbc.models.ModelScanMetricsReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A metrics reporter that persists scan and commit reports as first-class entities in the database. + * This provides better queryability and analytics capabilities compared to storing metrics as + * generic events. + * + * <p>To enable this reporter, set the configuration: + * + * <pre> + * polaris: + * iceberg-metrics: + * reporting: + * type: persistence + * </pre> + * + * <p>Note: This reporter requires the relational-jdbc persistence backend. If a different + * persistence backend is configured, metrics will be logged but not persisted. + */ +@ApplicationScoped +@Identifier("persistence") +public class PersistingMetricsReporter implements PolarisMetricsReporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersistingMetricsReporter.class); + + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final RealmContext realmContext; + private final Instance<SecurityIdentity> securityIdentityInstance; + + @Inject + public PersistingMetricsReporter( + MetaStoreManagerFactory metaStoreManagerFactory, + RealmContext realmContext, + Instance<SecurityIdentity> securityIdentityInstance) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.realmContext = realmContext; + this.securityIdentityInstance = securityIdentityInstance; + } + + @Override + public void reportMetric(String catalogName, TableIdentifier table, MetricsReport metricsReport) { + try { + String realmId = realmContext.getRealmIdentifier(); + String catalogId = catalogName; // Using catalog name as ID for now + String namespace = table.namespace().toString(); + + // Extract principal name from security context + String principalName = extractPrincipalName(); + String requestId = null; + + // Extract OpenTelemetry trace context from the current span + String otelTraceId = null; + String otelSpanId = null; + Span currentSpan = Span.current(); + if (currentSpan != null) { + SpanContext spanContext = currentSpan.getSpanContext(); + if (spanContext != null && spanContext.isValid()) { + otelTraceId = spanContext.getTraceId(); + otelSpanId = spanContext.getSpanId(); + LOGGER.trace( + "Captured OpenTelemetry context: traceId={}, spanId={}", otelTraceId, otelSpanId); + } + } + + // Get the persistence session for the current realm + BasePersistence session = metaStoreManagerFactory.getOrCreateSession(realmContext); + + // Check if the session is a JdbcBasePersistenceImpl (supports metrics persistence) + if (!(session instanceof JdbcBasePersistenceImpl jdbcPersistence)) { Review Comment: Backend Compatibility Guard: This explicit check for JdbcBasePersistenceImpl ensures the persistence reporter degrades gracefully when running with a non-JDBC backend (e.g., in-memory for testing). The warning log makes misconfiguration obvious in production logs. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/MetricsReportConverter.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.relational.jdbc.models; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.iceberg.metrics.CommitMetricsResult; +import org.apache.iceberg.metrics.CommitReport; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; + +/** + * Converter utility class for transforming Iceberg metrics reports into persistence model classes. + */ +public final class MetricsReportConverter { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private MetricsReportConverter() { + // Utility class + } + + /** + * Converts an Iceberg ScanReport to a ModelScanMetricsReport. + * + * @param scanReport the Iceberg scan report + * @param realmId the realm ID for multi-tenancy + * @param catalogId the catalog ID + * @param catalogName the catalog name + * @param namespace the namespace (dot-separated) + * @param principalName the principal who initiated the scan (optional) + * @param requestId the request ID (optional) + * @param otelTraceId OpenTelemetry trace ID (optional) + * @param otelSpanId OpenTelemetry span ID (optional) + * @return the converted ModelScanMetricsReport + */ + public static ModelScanMetricsReport fromScanReport( + ScanReport scanReport, + String realmId, + String catalogId, + String catalogName, + String namespace, + @Nullable String principalName, + @Nullable String requestId, + @Nullable String otelTraceId, + @Nullable String otelSpanId) { + + String reportId = UUID.randomUUID().toString(); + long timestampMs = System.currentTimeMillis(); + + ScanMetricsResult metrics = scanReport.scanMetrics(); + + ImmutableModelScanMetricsReport.Builder builder = + ImmutableModelScanMetricsReport.builder() + .reportId(reportId) + .realmId(realmId) + .catalogId(catalogId) + .catalogName(catalogName) + .namespace(namespace) + .tableName(scanReport.tableName()) + .timestampMs(timestampMs) + .principalName(principalName) + .requestId(requestId) + .otelTraceId(otelTraceId) + .otelSpanId(otelSpanId) + .snapshotId(scanReport.snapshotId()) + .schemaId(scanReport.schemaId()) + .filterExpression(scanReport.filter() != null ? scanReport.filter().toString() : null) + .projectedFieldIds(formatIntegerList(scanReport.projectedFieldIds())) + .projectedFieldNames(formatStringList(scanReport.projectedFieldNames())); + + // Extract metrics values + if (metrics != null) { + builder + .resultDataFiles(getCounterValue(metrics.resultDataFiles())) + .resultDeleteFiles(getCounterValue(metrics.resultDeleteFiles())) + .totalFileSizeBytes(getCounterValue(metrics.totalFileSizeInBytes())) + .totalDataManifests(getCounterValue(metrics.totalDataManifests())) + .totalDeleteManifests(getCounterValue(metrics.totalDeleteManifests())) + .scannedDataManifests(getCounterValue(metrics.scannedDataManifests())) + .scannedDeleteManifests(getCounterValue(metrics.scannedDeleteManifests())) + .skippedDataManifests(getCounterValue(metrics.skippedDataManifests())) + .skippedDeleteManifests(getCounterValue(metrics.skippedDeleteManifests())) + .skippedDataFiles(getCounterValue(metrics.skippedDataFiles())) + .skippedDeleteFiles(getCounterValue(metrics.skippedDeleteFiles())) + .totalPlanningDurationMs(getTimerValueMs(metrics.totalPlanningDuration())) + .equalityDeleteFiles(getCounterValue(metrics.equalityDeleteFiles())) + .positionalDeleteFiles(getCounterValue(metrics.positionalDeleteFiles())) + .indexedDeleteFiles(getCounterValue(metrics.indexedDeleteFiles())) + .totalDeleteFileSizeBytes(getCounterValue(metrics.totalDeleteFileSizeInBytes())); + } else { + builder + .resultDataFiles(0L) + .resultDeleteFiles(0L) + .totalFileSizeBytes(0L) + .totalDataManifests(0L) + .totalDeleteManifests(0L) + .scannedDataManifests(0L) + .scannedDeleteManifests(0L) + .skippedDataManifests(0L) + .skippedDeleteManifests(0L) + .skippedDataFiles(0L) + .skippedDeleteFiles(0L) + .totalPlanningDurationMs(0L) + .equalityDeleteFiles(0L) + .positionalDeleteFiles(0L) + .indexedDeleteFiles(0L) + .totalDeleteFileSizeBytes(0L); + } + + // Store additional metadata as JSON + Map<String, String> metadata = scanReport.metadata(); + if (metadata != null && !metadata.isEmpty()) { + builder.metadata(toJson(metadata)); + } + + return builder.build(); + } + + /** + * Converts an Iceberg CommitReport to a ModelCommitMetricsReport. + * + * @param commitReport the Iceberg commit report + * @param realmId the realm ID for multi-tenancy + * @param catalogId the catalog ID + * @param catalogName the catalog name + * @param namespace the namespace (dot-separated) + * @param principalName the principal who initiated the commit (optional) + * @param requestId the request ID (optional) + * @param otelTraceId OpenTelemetry trace ID (optional) + * @param otelSpanId OpenTelemetry span ID (optional) + * @return the converted ModelCommitMetricsReport + */ + public static ModelCommitMetricsReport fromCommitReport( + CommitReport commitReport, + String realmId, + String catalogId, + String catalogName, + String namespace, + @Nullable String principalName, + @Nullable String requestId, + @Nullable String otelTraceId, + @Nullable String otelSpanId) { + + String reportId = UUID.randomUUID().toString(); + long timestampMs = System.currentTimeMillis(); + + CommitMetricsResult metrics = commitReport.commitMetrics(); + + ImmutableModelCommitMetricsReport.Builder builder = + ImmutableModelCommitMetricsReport.builder() + .reportId(reportId) + .realmId(realmId) + .catalogId(catalogId) + .catalogName(catalogName) + .namespace(namespace) + .tableName(commitReport.tableName()) + .timestampMs(timestampMs) + .principalName(principalName) + .requestId(requestId) + .otelTraceId(otelTraceId) + .otelSpanId(otelSpanId) + .snapshotId(commitReport.snapshotId()) + .sequenceNumber(commitReport.sequenceNumber()) + .operation(commitReport.operation() != null ? commitReport.operation() : "UNKNOWN"); Review Comment: Iceberg's CommitReport.operation() can be null in some edge cases (e.g., failed commits before operation is determined), but our DB schema defines operation TEXT NOT NULL. We default to "UNKNOWN" to satisfy the constraint while making these cases visible in analytics queries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
