This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new b6a3c69cef [#8899] feat(iceberg): Supports Jdbc Iceberg metrics store
(#9010)
b6a3c69cef is described below
commit b6a3c69cefb99c512354d67dd458311806a895a4
Author: roryqi <[email protected]>
AuthorDate: Fri Nov 14 14:46:55 2025 +0800
[#8899] feat(iceberg): Supports Jdbc Iceberg metrics store (#9010)
### What changes were proposed in this pull request?
Supports Jdbc Iceberg metrics store
### Why are the changes needed?
Fix: #8899
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added IT.
---
.../apache/gravitino/storage/TestSQLScripts.java | 5 +
docs/iceberg-rest-service.md | 11 +
iceberg/iceberg-rest-server/build.gradle.kts | 3 +
.../iceberg/service/metrics/DummyMetricsStore.java | 3 +-
.../service/metrics/IcebergMetricsManager.java | 58 ++-
.../service/metrics/IcebergMetricsStore.java | 6 +-
.../iceberg/service/metrics/JDBCMetricsStore.java | 279 ++++++++++++++
.../service/rest/IcebergTableOperations.java | 3 +-
.../service/metrics/MemoryMetricsStore.java | 3 +-
.../service/metrics/TestIcebergMetricsManager.java | 13 +-
.../service/metrics/TestJdbcMetricsStore.java | 412 +++++++++++++++++++++
scripts/h2/iceberg-metrics-schema-1.1.0-h2.sql | 90 +++++
.../mysql/iceberg-metrics-schema-1.1.0-mysql.sql | 90 +++++
.../iceberg-metrics-schema-1.1.0-postgresql.sql | 169 +++++++++
14 files changed, 1123 insertions(+), 22 deletions(-)
diff --git
a/core/src/test/java/org/apache/gravitino/storage/TestSQLScripts.java
b/core/src/test/java/org/apache/gravitino/storage/TestSQLScripts.java
index 92f02fcf90..efe5f7e934 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestSQLScripts.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestSQLScripts.java
@@ -92,10 +92,13 @@ public class TestSQLScripts extends BaseIT {
Pattern.compile("schema-([\\d.]+)-" + jdbcBackend.toLowerCase() +
"\\.sql");
Pattern upgradePattern =
Pattern.compile("upgrade-([\\d.]+)-to-([\\d.]+)-" +
jdbcBackend.toLowerCase() + "\\.sql");
+ Pattern metricsPattern =
+ Pattern.compile("iceberg-metrics-schema-([\\d.]+)-" +
jdbcBackend.toLowerCase() + "\\.sql");
for (File scriptFile : scriptFiles) {
Matcher schemaMatcher = schemaPattern.matcher(scriptFile.getName());
Matcher upgradeMatcher = upgradePattern.matcher(scriptFile.getName());
+ Matcher metricsMatcher = metricsPattern.matcher(scriptFile.getName());
if (schemaMatcher.matches()) {
String version = schemaMatcher.group(1);
@@ -116,6 +119,8 @@ public class TestSQLScripts extends BaseIT {
Assertions.assertDoesNotThrow(
() -> executeSQLScript(conn, scriptFile),
"Failed to execute upgrade script" + " in file " +
scriptFile.getName());
+ } else if (metricsMatcher.matches()) {
+ // ignore iceberg metrics scripts for now
} else {
Assertions.fail("Unrecognized script file name: " +
scriptFile.getName());
}
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 7d361177ef..835989a407 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -425,6 +425,17 @@ Gravitino provides a pluggable metrics store interface to
store and delete Icebe
| `gravitino.iceberg-rest.metricsStoreRetainDays` | The days to retain Iceberg
metrics in store, the value not greater than 0 means retain forever.
| -1 | No | 0.4.0 |
| `gravitino.iceberg-rest.metricsQueueCapacity` | The size of queue to store
metrics temporally before storing to the persistent storage. Metrics will be
dropped when queue is full. | 1000 | No | 0.4.0 |
+If you want to use jdbc as metrics store, you can set the
`gravitino.iceberg-rest.metricsStore` to `jdbc`, and set the following
configurations to connect to the database.
+You should initialize the database using the sql scripts in the directory
`scripts`.
+You must download the corresponding JDBC driver to the
`iceberg-rest-server/libs` directory.
+
+| Configuration item | Description
| Default value | Required | Since
Version |
+|-----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.jdbc-metrics.url` | The JDBC connection
address, such as `jdbc:postgresql://127.0.0.1:5432/database` for Postgres, or
`jdbc:mysql://127.0.0.1:3306/database` for mysql. | (none) | Yes |
1.1.0 |
+| `gravitino.iceberg-rest.jdbc-metrics.jdbc-user` | The username of the
JDBC connection.
| (none) | No |
1.1.0 |
+| `gravitino.iceberg-rest.jdbc-metrics.jdbc-password` | The password of the
JDBC connection.
| (none) | No |
1.1.0 |
+| `gravitino.iceberg-rest.jdbc-metrics.jdbc-driver` |
`com.mysql.jdbc.Driver` or `com.mysql.cj.jdbc.Driver` for MySQL,
`org.postgresql.Driver` for PostgreSQL.
| (none) | Yes | 1.1.0 |
+
### Iceberg table metadata cache configuration
Gravitino features a pluggable cache system for updating or retrieving table
metadata in the cache. It validates the location of table metadata against the
catalog backend to ensure the correctness of cached data.
diff --git a/iceberg/iceberg-rest-server/build.gradle.kts
b/iceberg/iceberg-rest-server/build.gradle.kts
index 5cd60a4c8c..345dc21e5b 100644
--- a/iceberg/iceberg-rest-server/build.gradle.kts
+++ b/iceberg/iceberg-rest-server/build.gradle.kts
@@ -80,6 +80,9 @@ dependencies {
exclude("org.rocksdb")
}
+ testImplementation(libs.h2db)
+ testImplementation(libs.mysql.driver)
+ testImplementation(libs.postgresql.driver)
testImplementation(libs.iceberg.aws.bundle)
testImplementation(libs.iceberg.gcp.bundle)
testImplementation(libs.iceberg.azure.bundle) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/DummyMetricsStore.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/DummyMetricsStore.java
index 7bc482677f..5fe24f4bdc 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/DummyMetricsStore.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/DummyMetricsStore.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.iceberg.service.metrics;
import java.time.Instant;
import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.metrics.MetricsReport;
public class DummyMetricsStore implements IcebergMetricsStore {
@@ -29,7 +30,7 @@ public class DummyMetricsStore implements IcebergMetricsStore
{
public void init(Map<String, String> properties) {}
@Override
- public void recordMetric(MetricsReport metricsReport) {}
+ public void recordMetric(String catalog, Namespace namespace, MetricsReport
metricsReport) {}
@Override
public void close() {}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsManager.java
index c4fc719841..67b7353838 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.metrics.MetricsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,13 +45,15 @@ public class IcebergMetricsManager {
private static final ImmutableMap<String, String>
ICEBERG_METRICS_STORE_NAMES =
ImmutableMap.of(
DummyMetricsStore.ICEBERG_METRICS_STORE_DUMMY_NAME,
- DummyMetricsStore.class.getCanonicalName());
+ DummyMetricsStore.class.getCanonicalName(),
+ JDBCMetricsStore.ICEBERG_METRICS_STORE_JDBC_NAME,
+ JDBCMetricsStore.class.getCanonicalName());
private final IcebergMetricsFormatter icebergMetricsFormatter;
private final IcebergMetricsStore icebergMetricsStore;
private final int retainDays;
- private BlockingQueue<MetricsReport> queue;
+ private BlockingQueue<MetricsReportWrapper> queue;
private Thread metricsWriterThread;
private volatile boolean isClosed = false;
private Optional<ScheduledExecutorService> metricsCleanerExecutor =
Optional.empty();
@@ -111,16 +114,19 @@ public class IcebergMetricsManager {
/**
* Records a metrics report by adding it to the processing queue.
*
+ * @param catalogName the catalog name of the metrics report
+ * @param namespace the namespace of the metrics report
* @param metricsReport the metrics report to record
* @return true if the metric was successfully queued, false if it was
rejected (manager closed or
* queue full)
*/
- public boolean recordMetric(MetricsReport metricsReport) {
+ public boolean recordMetric(
+ String catalogName, Namespace namespace, MetricsReport metricsReport) {
if (isClosed) {
logMetrics("Drop Iceberg metrics because Iceberg Metrics Manager is
closed.", metricsReport);
return false;
}
- if (!queue.offer(metricsReport)) {
+ if (!queue.offer(new MetricsReportWrapper(catalogName, namespace,
metricsReport))) {
logMetrics("Drop Iceberg metrics because metrics queue is full.",
metricsReport);
return false;
}
@@ -156,21 +162,25 @@ public class IcebergMetricsManager {
private void writeMetrics() {
while (!Thread.currentThread().isInterrupted()) {
- MetricsReport metricsReport;
+ MetricsReportWrapper metricsReport;
try {
metricsReport = queue.take();
} catch (InterruptedException e) {
LOG.warn("Iceberg Metrics writer thread is interrupted.");
break;
}
- if (metricsReport != null) {
- doRecordMetric(metricsReport);
- }
+
+ doRecordMetric(
+ metricsReport.getCatalog(),
+ metricsReport.getNamespace(),
+ metricsReport.getMetricsReport());
}
- MetricsReport metricsReport = queue.poll();
+ MetricsReportWrapper metricsReport = queue.poll();
while (metricsReport != null) {
- logMetrics("Drop Iceberg metrics because it's time to close metrics
store.", metricsReport);
+ logMetrics(
+ "Drop Iceberg metrics because it's time to close metrics store.",
+ metricsReport.getMetricsReport());
metricsReport = queue.poll();
}
}
@@ -196,11 +206,35 @@ public class IcebergMetricsManager {
LOG.info("{} {}.", message,
icebergMetricsFormatter.toPrintableString(metricsReport));
}
- private void doRecordMetric(MetricsReport metricsReport) {
+ private void doRecordMetric(String catalog, Namespace namespace,
MetricsReport metricsReport) {
try {
- icebergMetricsStore.recordMetric(metricsReport);
+ icebergMetricsStore.recordMetric(catalog, namespace, metricsReport);
} catch (Exception e) {
LOG.warn("Write Iceberg metrics failed.", e);
}
}
+
+ private static class MetricsReportWrapper {
+ private final String catalog;
+ private final Namespace namespace;
+ private final MetricsReport metricsReport;
+
+ public MetricsReportWrapper(String catalog, Namespace namespace,
MetricsReport metricsReport) {
+ this.catalog = catalog;
+ this.namespace = namespace;
+ this.metricsReport = metricsReport;
+ }
+
+ public Namespace getNamespace() {
+ return namespace;
+ }
+
+ public MetricsReport getMetricsReport() {
+ return metricsReport;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsStore.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsStore.java
index 2145045c75..9d6004ce98 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsStore.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/IcebergMetricsStore.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.iceberg.service.metrics;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.metrics.MetricsReport;
/** A store API to save Apache Iceberg metrics. */
@@ -37,10 +38,13 @@ public interface IcebergMetricsStore {
/**
* Record metrics report.
*
+ * @param catalog the catalog name
+ * @param namespace the namespace of the table
* @param metricsReport the metrics to be saved
* @throws IOException if IO error happens
*/
- void recordMetric(MetricsReport metricsReport) throws IOException;
+ void recordMetric(String catalog, Namespace namespace, MetricsReport
metricsReport)
+ throws IOException;
/**
* Clean the expired Iceberg metrics
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/JDBCMetricsStore.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/JDBCMetricsStore.java
new file mode 100644
index 0000000000..c54c965d6b
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/metrics/JDBCMetricsStore.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.utils.MapUtils;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.jdbc.JdbcClientPool;
+import org.apache.iceberg.jdbc.UncheckedInterruptedException;
+import org.apache.iceberg.jdbc.UncheckedSQLException;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CounterResult;
+import org.apache.iceberg.metrics.MetricsReport;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.TimerResult;
+
+public class JDBCMetricsStore implements IcebergMetricsStore {
+ public static final String ICEBERG_METRICS_STORE_JDBC_NAME = "jdbc";
+ private static final String URI = "uri";
+ private static final String INSERT_COMMIT_REPORT_METRICS_SQL =
+ "INSERT INTO commit_metrics_report ("
+ + "timestamp, namespace, table_name, snapshot_id, sequence_number,
operation,"
+ + "added_data_files, removed_data_files, total_data_files,"
+ + "added_delete_files, added_equality_delete_files,
added_positional_delete_files, "
+ + "removed_delete_files, removed_equality_delete_files,
removed_positional_delete_files, total_delete_files,"
+ + "added_records, removed_records, total_records,"
+ + "added_files_size_in_bytes, removed_files_size_in_bytes,
total_files_size_in_bytes,"
+ + "added_positional_deletes, removed_positional_deletes,
total_positional_deletes,"
+ + "added_equality_deletes, removed_equality_deletes,
total_equality_deletes,"
+ + "manifests_created, manifests_replaced, manifests_kept,
manifest_entries_processed,"
+ + "added_dvs, removed_dvs,"
+ + "total_duration_ms, attempts, metadata"
+ + ") VALUES "
+ + "(?, ?, ?, ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?,"
+ + " ?, ?, ?, ?,"
+ + " ?, ?,"
+ + " ?, ?, ?);";
+
+ private static final String INSERT_SCAN_REPORT_METRICS_SQL =
+ "INSERT INTO scan_metrics_report ("
+ + "timestamp, namespace, table_name, snapshot_id, schema_id, "
+ + "filter, metadata, projected_field_ids, projected_field_names, "
+ + "equality_delete_files, indexed_delete_files,
positional_delete_files, "
+ + "result_data_files, result_delete_files, "
+ + "scanned_data_manifests, scanned_delete_manifests, "
+ + "skipped_data_files, skipped_data_manifests, "
+ + "skipped_delete_files, skipped_delete_manifests, "
+ + "total_data_manifests, total_delete_file_size_in_bytes, "
+ + "total_delete_manifests, total_file_size_in_bytes,"
+ + "total_planning_duration) VALUES "
+ + "(?, ?, ?, ?, ?,"
+ + "?, ?, ?, ?,"
+ + "?, ? ,?,"
+ + "?, ?,"
+ + "?, ?,"
+ + "?, ?,"
+ + "?, ?,"
+ + "?, ?,"
+ + "?, ?,"
+ + "?);";
+
+ private static final String DELETE_EXPIRED_SCAN_METRICS_SQL =
+ "DELETE FROM scan_metrics_report WHERE timestamp < ?; ";
+
+ private static final String DELETE_EXPIRED_COMMIT_METRICS_SQL =
+ "DELETE FROM commit_metrics_report WHERE timestamp < ?;";
+
+ @VisibleForTesting JdbcClientPool connections;
+
+ @Override
+ public void init(Map<String, String> properties) throws IOException {
+ initProperties(properties);
+ checkMetricsReportTableExists();
+ }
+
+ private void checkMetricsReportTableExists() {
+ try {
+ Preconditions.checkArgument(
+ connections.run(
+ conn -> {
+ DatabaseMetaData dbMeta = conn.getMetaData();
+ ResultSet commitReportTableExists =
+ dbMeta.getTables(
+ null /* catalog name */,
+ null /* schemaPattern */,
+ "commit_metrics_report" /* tableNamePattern */,
+ null /* types */);
+ ResultSet scanReportTableExists =
+ dbMeta.getTables(
+ null /* catalog name */,
+ null /* schemaPattern */,
+ "scan_metrics_report" /* tableNamePattern */,
+ null /* types */);
+
+ return commitReportTableExists.next() &&
scanReportTableExists.next();
+ }),
+ "JDBC metrics store tables do not exist. You should use the sql
scripts under directory `scripts` to initialize the database");
+ } catch (SQLException | InterruptedException exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+
+ @VisibleForTesting
+ void initProperties(Map<String, String> properties) {
+ Map<String, String> actualProps = MapUtils.getPrefixMap(properties,
"jdbc-metrics.");
+ String uri = actualProps.get(URI);
+ Preconditions.checkArgument(uri != null, "JDBC metrics store requires a
\"%s\" property", URI);
+
+ connections =
+ new JdbcClientPool(uri,
IcebergPropertiesUtils.toIcebergCatalogProperties(actualProps));
+ }
+
+ @Override
+ public void recordMetric(String catalog, Namespace namespace, MetricsReport
metricsReport)
+ throws IOException {
+ if (metricsReport instanceof CommitReport) {
+ CommitReport commitReport = (CommitReport) metricsReport;
+ execute(
+ INSERT_COMMIT_REPORT_METRICS_SQL,
+ Instant.now().toEpochMilli(),
+ String.format("%s.%s", catalog, namespace.toString()),
+ commitReport.tableName(),
+ commitReport.snapshotId(),
+ commitReport.sequenceNumber(),
+ commitReport.operation(),
+ getCounterResult(commitReport.commitMetrics().addedDataFiles()),
+ getCounterResult(commitReport.commitMetrics().removedDataFiles()),
+ getCounterResult(commitReport.commitMetrics().totalDataFiles()),
+ getCounterResult(commitReport.commitMetrics().addedDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().addedEqualityDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().addedPositionalDeleteFiles()),
+ getCounterResult(commitReport.commitMetrics().removedDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().removedEqualityDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().removedPositionalDeleteFiles()),
+ getCounterResult(commitReport.commitMetrics().totalDeleteFiles()),
+ getCounterResult(commitReport.commitMetrics().addedRecords()),
+ getCounterResult(commitReport.commitMetrics().removedRecords()),
+ getCounterResult(commitReport.commitMetrics().totalRecords()),
+
getCounterResult(commitReport.commitMetrics().addedFilesSizeInBytes()),
+
getCounterResult(commitReport.commitMetrics().removedFilesSizeInBytes()),
+
getCounterResult(commitReport.commitMetrics().totalFilesSizeInBytes()),
+
getCounterResult(commitReport.commitMetrics().addedPositionalDeletes()),
+
getCounterResult(commitReport.commitMetrics().removedPositionalDeletes()),
+
getCounterResult(commitReport.commitMetrics().totalPositionalDeletes()),
+
getCounterResult(commitReport.commitMetrics().addedEqualityDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().removedEqualityDeleteFiles()),
+
getCounterResult(commitReport.commitMetrics().totalEqualityDeletes()),
+ getCounterResult(commitReport.commitMetrics().manifestsCreated()),
+ getCounterResult(commitReport.commitMetrics().manifestsReplaced()),
+ getCounterResult(commitReport.commitMetrics().manifestsKept()),
+
getCounterResult(commitReport.commitMetrics().manifestEntriesProcessed()),
+ getCounterResult(commitReport.commitMetrics().addedDVs()),
+ getCounterResult(commitReport.commitMetrics().removedDVs()),
+ getTimerResult(commitReport.commitMetrics().totalDuration()),
+ getCounterResult(commitReport.commitMetrics().attempts()),
+
JsonUtils.objectMapper().writeValueAsString(commitReport.metadata()));
+ } else if (metricsReport instanceof ScanReport) {
+ ScanReport scanReport = (ScanReport) metricsReport;
+ execute(
+ INSERT_SCAN_REPORT_METRICS_SQL,
+ Instant.now().toEpochMilli(),
+ String.format("%s.%s", catalog, namespace.toString()),
+ scanReport.tableName(),
+ scanReport.snapshotId(),
+ scanReport.schemaId(),
+ scanReport.filter().toString(),
+ JsonUtils.objectMapper().writeValueAsString(scanReport.metadata()),
+
JsonUtils.objectMapper().writeValueAsString(scanReport.projectedFieldIds()),
+
JsonUtils.objectMapper().writeValueAsString(scanReport.projectedFieldNames().toString()),
+ getCounterResult(scanReport.scanMetrics().equalityDeleteFiles()),
+ getCounterResult(scanReport.scanMetrics().indexedDeleteFiles()),
+ getCounterResult(scanReport.scanMetrics().positionalDeleteFiles()),
+ getCounterResult(scanReport.scanMetrics().resultDataFiles()),
+ getCounterResult(scanReport.scanMetrics().resultDeleteFiles()),
+ getCounterResult(scanReport.scanMetrics().scannedDataManifests()),
+ getCounterResult(scanReport.scanMetrics().scannedDeleteManifests()),
+ getCounterResult(scanReport.scanMetrics().skippedDataFiles()),
+ getCounterResult(scanReport.scanMetrics().skippedDataManifests()),
+ getCounterResult(scanReport.scanMetrics().skippedDeleteFiles()),
+ getCounterResult(scanReport.scanMetrics().skippedDeleteManifests()),
+ getCounterResult(scanReport.scanMetrics().totalDataManifests()),
+
getCounterResult(scanReport.scanMetrics().totalDeleteFileSizeInBytes()),
+ getCounterResult(scanReport.scanMetrics().totalDeleteManifests()),
+ getCounterResult(scanReport.scanMetrics().totalFileSizeInBytes()),
+ getTimerResult(scanReport.scanMetrics().totalPlanningDuration()));
+ }
+ }
+
+ @Override
+ public void clean(Instant expireTime) throws IOException {
+ execute(DELETE_EXPIRED_SCAN_METRICS_SQL, expireTime.toEpochMilli());
+ execute(DELETE_EXPIRED_COMMIT_METRICS_SQL, expireTime.toEpochMilli());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (connections != null) {
+ connections.close();
+ }
+ }
+
+ @VisibleForTesting
+ static long getCounterResult(CounterResult result) {
+ return result != null ? result.value() : 0L;
+ }
+
+ @VisibleForTesting
+ static long getTimerResult(TimerResult result) {
+ return result != null ? result.count() : 0L;
+ }
+
+ @VisibleForTesting
+ int execute(String sql, Object... args) {
+ return execute(err -> {}, sql, args);
+ }
+
+ private int execute(Consumer<SQLException> sqlErrorHandler, String sql,
Object... args) {
+ try {
+ return connections.run(
+ conn -> {
+ try (PreparedStatement preparedStatement =
conn.prepareStatement(sql)) {
+ for (int pos = 0; pos < args.length; pos += 1) {
+ if (args[pos] instanceof Long) {
+ preparedStatement.setLong(pos + 1, (Long) args[pos]);
+ } else if (args[pos] instanceof String) {
+ preparedStatement.setString(pos + 1, (String) args[pos]);
+ } else if (args[pos] instanceof Integer) {
+ preparedStatement.setInt(pos + 1, (Integer) args[pos]);
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported argument type: " + args[pos].getClass());
+ }
+ }
+
+ return preparedStatement.executeUpdate();
+ }
+ });
+ } catch (SQLException e) {
+ sqlErrorHandler.accept(e);
+ throw new UncheckedSQLException(e, "Failed to execute: %s", sql);
+ } catch (InterruptedException e) {
+ throw new UncheckedInterruptedException(e, "Interrupted in SQL command");
+ }
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 40b5660812..e42d318790 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -310,7 +310,8 @@ public class IcebergTableOperations {
return Utils.doAs(
httpRequest,
() -> {
- boolean accepted =
icebergMetricsManager.recordMetric(request.report());
+ boolean accepted =
+ icebergMetricsManager.recordMetric(catalogName, icebergNS,
request.report());
if (accepted) {
return IcebergRestUtils.noContent();
} else {
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/MemoryMetricsStore.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/MemoryMetricsStore.java
index c30a839a7b..b4107612e1 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/MemoryMetricsStore.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/MemoryMetricsStore.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.iceberg.service.metrics;
import java.time.Instant;
import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.metrics.MetricsReport;
/** Store Iceberg metrics in memory, used for test */
@@ -35,7 +36,7 @@ public class MemoryMetricsStore implements
IcebergMetricsStore {
}
@Override
- public void recordMetric(MetricsReport metricsReport) {
+ public void recordMetric(String catalog, Namespace namespace, MetricsReport
metricsReport) {
this.metricsReport = metricsReport;
this.recordTime = Instant.now();
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestIcebergMetricsManager.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestIcebergMetricsManager.java
index 2d9cc94c76..8f7c4dfb17 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestIcebergMetricsManager.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestIcebergMetricsManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.metrics.ImmutableCommitMetricsResult;
import org.apache.iceberg.metrics.ImmutableCommitReport;
import org.apache.iceberg.metrics.MetricsReport;
@@ -64,7 +65,7 @@ public class TestIcebergMetricsManager {
icebergMetricsManager.start();
MetricsReport metricsReport = createMetricsReport();
- icebergMetricsManager.recordMetric(metricsReport);
+ icebergMetricsManager.recordMetric("a", Namespace.of("a"), metricsReport);
Assertions.assertDoesNotThrow(
() -> (DummyMetricsStore)
icebergMetricsManager.getIcebergMetricsStore());
icebergMetricsManager.close();
@@ -90,7 +91,7 @@ public class TestIcebergMetricsManager {
icebergMetricsManager.start();
MetricsReport metricsReport = createMetricsReport();
- icebergMetricsManager.recordMetric(metricsReport);
+ icebergMetricsManager.recordMetric("a", Namespace.of("a"), metricsReport);
MemoryMetricsStore memoryMetricsStore =
(MemoryMetricsStore) icebergMetricsManager.getIcebergMetricsStore();
Assertions.assertEquals(metricsReport,
tryGetIcebergMetrics(memoryMetricsStore));
@@ -106,7 +107,7 @@ public class TestIcebergMetricsManager {
icebergMetricsManager.start();
MetricsReport metricsReport = createMetricsReport();
- boolean result = icebergMetricsManager.recordMetric(metricsReport);
+ boolean result = icebergMetricsManager.recordMetric("a",
Namespace.of("a"), metricsReport);
Assertions.assertTrue(result, "Recording metric should return true when
successful");
icebergMetricsManager.close();
@@ -120,7 +121,7 @@ public class TestIcebergMetricsManager {
icebergMetricsManager.close();
MetricsReport metricsReport = createMetricsReport();
- boolean result = icebergMetricsManager.recordMetric(metricsReport);
+ boolean result = icebergMetricsManager.recordMetric("a",
Namespace.of("a"), metricsReport);
Assertions.assertFalse(result, "Recording metric should return false when
manager is closed");
}
@@ -137,11 +138,11 @@ public class TestIcebergMetricsManager {
MetricsReport metricsReport2 = createMetricsReport();
// First metric should succeed
- boolean result1 = icebergMetricsManager.recordMetric(metricsReport1);
+ boolean result1 = icebergMetricsManager.recordMetric("a",
Namespace.of("a"), metricsReport1);
Assertions.assertTrue(result1, "First metric should be queued
successfully");
// Second metric should fail because queue is full
- boolean result2 = icebergMetricsManager.recordMetric(metricsReport2);
+ boolean result2 = icebergMetricsManager.recordMetric("a",
Namespace.of("a"), metricsReport2);
Assertions.assertFalse(result2, "Second metric should fail when queue is
full");
icebergMetricsManager.close();
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestJdbcMetricsStore.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestJdbcMetricsStore.java
new file mode 100644
index 0000000000..7529c4bc1c
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/metrics/TestJdbcMetricsStore.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.iceberg.service.metrics;
+
+import static
org.apache.gravitino.iceberg.service.metrics.JDBCMetricsStore.getCounterResult;
+import static
org.apache.gravitino.iceberg.service.metrics.JDBCMetricsStore.getTimerResult;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.util.TestDatabaseName;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-test")
+public class TestJdbcMetricsStore {
+
+ private static final String CURRENT_SCRIPT_VERSION = "1.1.0";
+ private static final Map<String, Map<String, String>> dbProperties =
Maps.newHashMap();
+
+ @BeforeAll
+ public static void beforeAll() {
+ ContainerSuite containerSuite = ContainerSuite.getInstance();
+ containerSuite.startMySQLContainer(TestDatabaseName.MYSQL_JDBC_BACKEND);
+ containerSuite.startPostgreSQLContainer(TestDatabaseName.PG_JDBC_BACKEND);
+ // Prepare test configurations
+ Map<String, String> h2Properties = Maps.newHashMap();
+ h2Properties.put("jdbc-metrics.jdbc-driver", "org.h2.Driver");
+ h2Properties.put("jdbc-metrics.uri",
"jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;MODE=MYSQL");
+ dbProperties.put("h2", h2Properties);
+
+ Map<String, String> mysqlProperties = Maps.newHashMap();
+ mysqlProperties.put("jdbc-metrics.jdbc-driver",
"com.mysql.cj.jdbc.Driver");
+ mysqlProperties.put(
+ "jdbc-metrics.uri",
+
containerSuite.getMySQLContainer().getJdbcUrl(TestDatabaseName.MYSQL_JDBC_BACKEND));
+ mysqlProperties.put("jdbc-metrics.jdbc-user",
containerSuite.getMySQLContainer().getUsername());
+ mysqlProperties.put(
+ "jdbc-metrics.jdbc-password",
containerSuite.getMySQLContainer().getPassword());
+ dbProperties.put("mysql", mysqlProperties);
+ Map<String, String> postgresqlProperties = Maps.newHashMap();
+ postgresqlProperties.put("jdbc-metrics.jdbc-driver",
"org.postgresql.Driver");
+ postgresqlProperties.put(
+ "jdbc-metrics.uri",
containerSuite.getPostgreSQLContainer().getJdbcUrl());
+ postgresqlProperties.put(
+ "jdbc-metrics.jdbc-user",
containerSuite.getPostgreSQLContainer().getUsername());
+ postgresqlProperties.put(
+ "jdbc-metrics.jdbc-password",
containerSuite.getPostgreSQLContainer().getPassword());
+ dbProperties.put("postgresql", postgresqlProperties);
+ }
+
+ @Test
+ public void testJdbcMetricsStore() throws Exception {
+ CommitMetrics commitMetrics = CommitMetrics.of(new
DefaultMetricsContext());
+ commitMetrics.totalDuration().record(100, TimeUnit.SECONDS);
+ commitMetrics.attempts().increment(4);
+ Map<String, String> snapshotSummary =
+ ImmutableMap.<String, String>builder()
+ .put(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .put(SnapshotSummary.DELETED_FILES_PROP, "2")
+ .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+ .put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
+ .put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
+ .put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
+ .put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
+ .put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
+ .put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
+ .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "10")
+ .put(SnapshotSummary.ADDED_RECORDS_PROP, "11")
+ .put(SnapshotSummary.DELETED_RECORDS_PROP, "12")
+ .put(SnapshotSummary.TOTAL_RECORDS_PROP, "13")
+ .put(SnapshotSummary.ADDED_FILE_SIZE_PROP, "14")
+ .put(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "15")
+ .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "16")
+ .put(SnapshotSummary.ADDED_POS_DELETES_PROP, "17")
+ .put(SnapshotSummary.ADDED_EQ_DELETES_PROP, "18")
+ .put(SnapshotSummary.REMOVED_POS_DELETES_PROP, "19")
+ .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
+ .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
+ .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+ .build();
+
+ String tableName = "tableName";
+ CommitReport commitReport =
+ ImmutableCommitReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .operation("DELETE")
+ .sequenceNumber(4L)
+ .commitMetrics(CommitMetricsResult.from(commitMetrics,
snapshotSummary))
+ .build();
+
+ ScanMetrics scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+ scanMetrics.totalPlanningDuration().record(10, TimeUnit.MINUTES);
+ scanMetrics.resultDataFiles().increment(5L);
+ scanMetrics.resultDeleteFiles().increment(5L);
+ scanMetrics.scannedDataManifests().increment(5L);
+ scanMetrics.totalFileSizeInBytes().increment(1024L);
+ scanMetrics.totalDataManifests().increment(5L);
+
+ int schemaId = 4;
+ List<Integer> fieldIds = Arrays.asList(1, 2);
+ List<String> fieldNames = Arrays.asList("c1", "c2");
+
+ ScanReport scanReport =
+ ImmutableScanReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .filter(Expressions.alwaysTrue())
+ .schemaId(schemaId)
+ .projectedFieldIds(fieldIds)
+ .projectedFieldNames(fieldNames)
+ .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics))
+ .build();
+
+ for (Map.Entry<String, Map<String, String>> entry :
dbProperties.entrySet()) {
+ JDBCMetricsStore metricsStore = new JDBCMetricsStore();
+ metricsStore.initProperties(entry.getValue());
+
+ String gravitinoHome = System.getenv("GRAVITINO_ROOT_DIR");
+ String mysqlContent =
+ FileUtils.readFileToString(
+ new File(
+ gravitinoHome
+ + String.format(
+ "/scripts/%s/iceberg-metrics-schema-%s-%s.sql",
+ entry.getKey(), CURRENT_SCRIPT_VERSION,
entry.getKey())),
+ "UTF-8");
+
+ String[] sqls =
+ Arrays.stream(mysqlContent.split(";"))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .toArray(String[]::new);
+ // Init the store to create tables
+ for (String sql : sqls) {
+ metricsStore.execute(sql);
+ }
+
+ metricsStore.recordMetric("a", Namespace.of("a"), commitReport);
+ metricsStore.recordMetric("a", Namespace.of("b"), scanReport);
+
+ String countSql = "SELECT COUNT(*) AS total FROM commit_metrics_report";
+ Integer count = metricsStore.connections.run(getTotal(countSql));
+ String selectCommitReportSql = "SELECT * FROM commit_metrics_report";
+ metricsStore.connections.run(validateCommitReport(selectCommitReportSql,
commitReport));
+
+ Assertions.assertEquals(1, count);
+ String countSql2 = "SELECT COUNT(*) AS total FROM scan_metrics_report";
+ count = metricsStore.connections.run(getTotal(countSql2));
+ Assertions.assertEquals(1, count);
+ String selectScanReportSql = "SELECT * FROM scan_metrics_report";
+ metricsStore.connections.run(validateScanReport(selectScanReportSql,
scanReport));
+
+ metricsStore.clean(Instant.now());
+
+ count = metricsStore.connections.run(getTotal(countSql));
+ Assertions.assertEquals(0, count);
+ count = metricsStore.connections.run(getTotal(countSql2));
+ Assertions.assertEquals(0, count);
+
+ metricsStore.close();
+ }
+ }
+
+ private static ClientPool.Action<Integer, Connection, SQLException>
getTotal(String countSql2) {
+ return conn -> {
+ try (var stmt = conn.createStatement();
+ var rs = stmt.executeQuery(countSql2)) {
+ if (rs.next()) {
+ return rs.getInt("total");
+ }
+ return 0;
+ }
+ };
+ }
+
+ private static ClientPool.Action<Void, Connection, SQLException>
validateCommitReport(
+ String sql, CommitReport commitReport) {
+ return conn -> {
+ try (var stmt = conn.createStatement()) {
+ var rs = stmt.executeQuery(sql);
+ if (rs.next()) {
+ Assertions.assertEquals("a.a", rs.getString("namespace"));
+ Assertions.assertEquals(commitReport.tableName(),
rs.getString("table_name"));
+ Assertions.assertEquals(commitReport.snapshotId(),
rs.getLong("snapshot_id"));
+ Assertions.assertEquals(commitReport.sequenceNumber(),
rs.getLong("sequence_number"));
+ Assertions.assertEquals(commitReport.operation(),
rs.getString("operation"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().addedDataFiles()),
+ rs.getLong("added_data_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedDataFiles()),
+ rs.getLong("removed_data_files"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().totalDataFiles()),
+ rs.getLong("total_data_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedDeleteFiles()),
+ rs.getLong("added_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedEqualityDeleteFiles()),
+ rs.getLong("added_equality_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedPositionalDeleteFiles()),
+ rs.getLong("added_positional_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedDeleteFiles()),
+ rs.getLong("removed_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedEqualityDeleteFiles()),
+ rs.getLong("removed_equality_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedPositionalDeleteFiles()),
+ rs.getLong("removed_positional_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().totalDeleteFiles()),
+ rs.getLong("total_delete_files"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().addedRecords()),
+ rs.getLong("added_records"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().removedRecords()),
+ rs.getLong("removed_records"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().totalRecords()),
+ rs.getLong("total_records"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedFilesSizeInBytes()),
+ rs.getLong("added_files_size_in_bytes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedFilesSizeInBytes()),
+ rs.getLong("removed_files_size_in_bytes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().totalFilesSizeInBytes()),
+ rs.getLong("total_files_size_in_bytes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedPositionalDeletes()),
+ rs.getLong("added_positional_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedPositionalDeletes()),
+ rs.getLong("removed_positional_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().totalPositionalDeletes()),
+ rs.getLong("total_positional_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().addedEqualityDeleteFiles()),
+ rs.getLong("added_equality_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().removedEqualityDeleteFiles()),
+ rs.getLong("removed_equality_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().totalEqualityDeletes()),
+ rs.getLong("total_equality_deletes"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().manifestsCreated()),
+ rs.getLong("manifests_created"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().manifestsReplaced()),
+ rs.getLong("manifests_replaced"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().manifestsKept()),
+ rs.getLong("manifests_kept"));
+ Assertions.assertEquals(
+
getCounterResult(commitReport.commitMetrics().manifestEntriesProcessed()),
+ rs.getLong("manifest_entries_processed"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().addedDVs()),
rs.getLong("added_dvs"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().removedDVs()),
+ rs.getLong("removed_dvs"));
+ Assertions.assertEquals(
+ getTimerResult(commitReport.commitMetrics().totalDuration()),
+ rs.getLong("total_duration_ms"));
+ Assertions.assertEquals(
+ getCounterResult(commitReport.commitMetrics().attempts()),
rs.getLong("attempts"));
+ Assertions.assertEquals(
+
JsonUtils.objectMapper().writeValueAsString(commitReport.metadata()),
+ rs.getString("metadata"));
+
+ } else {
+ Assertions.fail("No data found in commit_metrics_report");
+ }
+ } catch (Exception e) {
+ Assertions.fail("Exception occurred while validating commit report: "
+ e.getMessage());
+ }
+ return null;
+ };
+ }
+
+ private static ClientPool.Action<Void, Connection, SQLException>
validateScanReport(
+ String sql, ScanReport scanReport) {
+ return conn -> {
+ try (var stmt = conn.createStatement()) {
+ var rs = stmt.executeQuery(sql);
+ if (rs.next()) {
+ Assertions.assertEquals(scanReport.tableName(),
rs.getString("table_name"));
+ Assertions.assertEquals(scanReport.snapshotId(),
rs.getLong("snapshot_id"));
+ Assertions.assertEquals(scanReport.schemaId(),
rs.getInt("schema_id"));
+
+ Assertions.assertEquals(scanReport.filter().toString(),
rs.getString("filter"));
+ Assertions.assertEquals(
+
JsonUtils.objectMapper().writeValueAsString(scanReport.metadata()),
+ rs.getString("metadata"));
+ Assertions.assertEquals(
+
JsonUtils.objectMapper().writeValueAsString(scanReport.projectedFieldIds()),
+ rs.getString("projected_field_ids"));
+ Assertions.assertEquals(
+ JsonUtils.objectMapper()
+
.writeValueAsString(scanReport.projectedFieldNames().toString()),
+ rs.getString("projected_field_names"));
+
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().equalityDeleteFiles()),
+ rs.getLong("equality_delete_files"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().indexedDeleteFiles()),
+ rs.getLong("indexed_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().positionalDeleteFiles()),
+ rs.getLong("positional_delete_files"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().resultDataFiles()),
+ rs.getLong("result_data_files"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().resultDeleteFiles()),
+ rs.getLong("result_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().scannedDataManifests()),
+ rs.getLong("scanned_data_manifests"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().scannedDeleteManifests()),
+ rs.getLong("scanned_delete_manifests"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().skippedDataFiles()),
+ rs.getLong("skipped_data_files"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().skippedDataManifests()),
+ rs.getLong("skipped_data_manifests"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().skippedDeleteFiles()),
+ rs.getLong("skipped_delete_files"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().skippedDeleteManifests()),
+ rs.getLong("skipped_delete_manifests"));
+ Assertions.assertEquals(
+ getCounterResult(scanReport.scanMetrics().totalDataManifests()),
+ rs.getLong("total_data_manifests"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().totalDeleteFileSizeInBytes()),
+ rs.getLong("total_delete_file_size_in_bytes"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().totalDeleteManifests()),
+ rs.getLong("total_delete_manifests"));
+ Assertions.assertEquals(
+
getCounterResult(scanReport.scanMetrics().totalFileSizeInBytes()),
+ rs.getLong("total_file_size_in_bytes"));
+ Assertions.assertEquals(
+ getTimerResult(scanReport.scanMetrics().totalPlanningDuration()),
+ rs.getLong("total_planning_duration"));
+
+ } else {
+ Assertions.fail("No data found in scan_metrics_report");
+ }
+ } catch (Exception e) {
+ Assertions.fail("Exception occurred while validating scan report: " +
e.getMessage());
+ }
+ return null;
+ };
+ }
+}
diff --git a/scripts/h2/iceberg-metrics-schema-1.1.0-h2.sql
b/scripts/h2/iceberg-metrics-schema-1.1.0-h2.sql
new file mode 100644
index 0000000000..bad0ac2a56
--- /dev/null
+++ b/scripts/h2/iceberg-metrics-schema-1.1.0-h2.sql
@@ -0,0 +1,90 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+CREATE TABLE commit_metrics_report (
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
+ timestamp BIGINT NOT NULL COMMENT 'Timestamp in milliseconds',
+ namespace VARCHAR(1024) NOT NULL COMMENT 'Namespace of the table',
+ table_name VARCHAR(255) NOT NULL COMMENT 'Table name',
+ snapshot_id BIGINT NOT NULL COMMENT 'Snapshot identifier',
+ sequence_number BIGINT NOT NULL COMMENT 'Sequence number',
+ operation VARCHAR(50) NOT NULL COMMENT 'Operation type (APPEND, OVERWRITE,
etc)',
+ added_data_files BIGINT DEFAULT 0 COMMENT 'Number of added data files',
+ removed_data_files BIGINT DEFAULT 0 COMMENT 'Number of removed data files',
+ total_data_files BIGINT DEFAULT 0 COMMENT 'Total number of data files',
+ added_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added delete files',
+ added_equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added
equality delete files',
+ added_positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added
positional delete files',
+ removed_delete_files BIGINT DEFAULT 0 COMMENT 'Number of removed delete
files',
+ removed_equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of removed
equality delete files',
+ removed_positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of
removed positional delete files',
+ total_delete_files BIGINT DEFAULT 0 COMMENT 'Total number of delete files',
+ added_records BIGINT DEFAULT 0 COMMENT 'Number of added records',
+ removed_records BIGINT DEFAULT 0 COMMENT 'Number of removed records',
+ total_records BIGINT DEFAULT 0 COMMENT 'Total number of records',
+ added_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Size of added files in
bytes',
+ removed_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Size of removed
files in bytes',
+ total_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total file size in
bytes',
+ added_positional_deletes BIGINT DEFAULT 0 COMMENT 'Number of added
positional deletes',
+ removed_positional_deletes BIGINT DEFAULT 0 COMMENT 'Number of removed
positional deletes',
+ total_positional_deletes BIGINT DEFAULT 0 COMMENT 'Total number of
positional deletes',
+ added_equality_deletes BIGINT DEFAULT 0 COMMENT 'Number of added equality
deletes',
+ removed_equality_deletes BIGINT DEFAULT 0 COMMENT 'Number of removed
equality deletes',
+ total_equality_deletes BIGINT DEFAULT 0 COMMENT 'Total number of equality
deletes',
+ manifests_created BIGINT DEFAULT 0 COMMENT 'Number of manifests created',
+ manifests_replaced BIGINT DEFAULT 0 COMMENT 'Number of manifests replaced',
+ manifests_kept BIGINT DEFAULT 0 COMMENT 'Number of manifests kept',
+ manifest_entries_processed BIGINT DEFAULT 0 COMMENT 'Number of manifest
entries processed',
+ added_dvs BIGINT DEFAULT 0 COMMENT 'Number of added delete vectors',
+ removed_dvs BIGINT DEFAULT 0 COMMENT 'Number of removed delete vectors',
+ total_duration_ms BIGINT DEFAULT 0 COMMENT 'Total operation duration in
milliseconds',
+ attempts BIGINT DEFAULT 1 COMMENT 'Number of attempts',
+ metadata CLOB COMMENT 'Additional metadata in JSON format',
+ KEY `idx_commit_report` (`timestamp`, `namespace`, `table_name`)
+) ENGINE = InnoDB;
+
+CREATE TABLE scan_metrics_report (
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
+ timestamp BIGINT NOT NULL COMMENT 'Timestamp in milliseconds',
+ namespace VARCHAR(1024) NOT NULL COMMENT 'Namespace of the table',
+ table_name VARCHAR(255) NOT NULL COMMENT 'Table name',
+ snapshot_id BIGINT COMMENT 'Snapshot identifier',
+ schema_id INT COMMENT 'Schema identifier',
+ filter CLOB COMMENT 'Filter condition applied during scan',
+ metadata CLOB COMMENT 'Additional metadata in JSON format',
+ projected_field_ids CLOB COMMENT 'List of projected field IDs',
+ projected_field_names CLOB COMMENT 'List of projected field names',
+ equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of equality delete
files',
+ indexed_delete_files BIGINT DEFAULT 0 COMMENT 'Number of indexed delete
files',
+ positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of positional
delete files',
+ result_data_files BIGINT DEFAULT 0 COMMENT 'Number of data files
processed',
+ result_delete_files BIGINT DEFAULT 0 COMMENT 'Number of delete files
processed',
+ scanned_data_manifests BIGINT DEFAULT 0 COMMENT 'Number of data manifests
scanned',
+ scanned_delete_manifests BIGINT DEFAULT 0 COMMENT 'Number of delete
manifests scanned',
+ skipped_data_files BIGINT DEFAULT 0 COMMENT 'Number of data files skipped',
+ skipped_data_manifests BIGINT DEFAULT 0 COMMENT 'Number of data manifests
skipped',
+ skipped_delete_files BIGINT DEFAULT 0 COMMENT 'Number of delete files
skipped',
+ skipped_delete_manifests BIGINT DEFAULT 0 COMMENT 'Number of delete
manifests skipped',
+ total_data_manifests BIGINT DEFAULT 0 COMMENT 'Total number of data
manifests',
+ total_delete_file_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total size of
delete files in bytes',
+ total_delete_manifests BIGINT DEFAULT 0 COMMENT 'Total number of delete
manifests',
+ total_file_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total file size in
bytes',
+ total_planning_duration BIGINT DEFAULT 0 COMMENT 'Total planning duration
in milliseconds',
+ KEY `idx_scan_report` (`timestamp`, `namespace`, `table_name`)
+) ENGINE = InnoDB;
diff --git a/scripts/mysql/iceberg-metrics-schema-1.1.0-mysql.sql
b/scripts/mysql/iceberg-metrics-schema-1.1.0-mysql.sql
new file mode 100644
index 0000000000..1d7714a672
--- /dev/null
+++ b/scripts/mysql/iceberg-metrics-schema-1.1.0-mysql.sql
@@ -0,0 +1,90 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+CREATE TABLE commit_metrics_report (
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
+ timestamp BIGINT NOT NULL COMMENT 'Timestamp in milliseconds',
+ namespace VARCHAR(1024) NOT NULL COMMENT 'Namespace of the table',
+ table_name VARCHAR(255) NOT NULL COMMENT 'Table name',
+ snapshot_id BIGINT NOT NULL COMMENT 'Snapshot identifier',
+ sequence_number BIGINT NOT NULL COMMENT 'Sequence number',
+ operation VARCHAR(50) NOT NULL COMMENT 'Operation type (APPEND, OVERWRITE,
etc)',
+ added_data_files BIGINT DEFAULT 0 COMMENT 'Number of added data files',
+ removed_data_files BIGINT DEFAULT 0 COMMENT 'Number of removed data files',
+ total_data_files BIGINT DEFAULT 0 COMMENT 'Total number of data files',
+ added_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added delete files',
+ added_equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added
equality delete files',
+ added_positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of added
positional delete files',
+ removed_delete_files BIGINT DEFAULT 0 COMMENT 'Number of removed delete
files',
+ removed_equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of removed
equality delete files',
+ removed_positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of
removed positional delete files',
+ total_delete_files BIGINT DEFAULT 0 COMMENT 'Total number of delete files',
+ added_records BIGINT DEFAULT 0 COMMENT 'Number of added records',
+ removed_records BIGINT DEFAULT 0 COMMENT 'Number of removed records',
+ total_records BIGINT DEFAULT 0 COMMENT 'Total number of records',
+ added_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Size of added files in
bytes',
+ removed_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Size of removed
files in bytes',
+ total_files_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total file size in
bytes',
+ added_positional_deletes BIGINT DEFAULT 0 COMMENT 'Number of added
positional deletes',
+ removed_positional_deletes BIGINT DEFAULT 0 COMMENT 'Number of removed
positional deletes',
+ total_positional_deletes BIGINT DEFAULT 0 COMMENT 'Total number of
positional deletes',
+ added_equality_deletes BIGINT DEFAULT 0 COMMENT 'Number of added equality
deletes',
+ removed_equality_deletes BIGINT DEFAULT 0 COMMENT 'Number of removed
equality deletes',
+ total_equality_deletes BIGINT DEFAULT 0 COMMENT 'Total number of equality
deletes',
+ manifests_created BIGINT DEFAULT 0 COMMENT 'Number of manifests created',
+ manifests_replaced BIGINT DEFAULT 0 COMMENT 'Number of manifests replaced',
+ manifests_kept BIGINT DEFAULT 0 COMMENT 'Number of manifests kept',
+ manifest_entries_processed BIGINT DEFAULT 0 COMMENT 'Number of manifest
entries processed',
+ added_dvs BIGINT DEFAULT 0 COMMENT 'Number of added delete vectors',
+ removed_dvs BIGINT DEFAULT 0 COMMENT 'Number of removed delete vectors',
+ total_duration_ms BIGINT DEFAULT 0 COMMENT 'Total operation duration in
milliseconds',
+ attempts BIGINT DEFAULT 1 COMMENT 'Number of attempts',
+ metadata TEXT COMMENT 'Additional metadata in JSON format',
+ KEY `idx_commit_report` (`timestamp`, `namespace`(255), `table_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'Table for
storing commit metrics information';
+
+CREATE TABLE scan_metrics_report (
+ id BIGINT AUTO_INCREMENT PRIMARY KEY,
+ timestamp BIGINT NOT NULL COMMENT 'Timestamp in milliseconds',
+ namespace VARCHAR(1024) NOT NULL COMMENT 'Namespace of the table',
+ table_name VARCHAR(255) NOT NULL COMMENT 'Table name',
+ snapshot_id BIGINT COMMENT 'Snapshot identifier',
+ schema_id BIGINT COMMENT 'Schema identifier',
+ filter TEXT COMMENT 'Filter condition applied during scan',
+ metadata TEXT COMMENT 'Additional metadata in JSON format',
+ projected_field_ids TEXT COMMENT 'List of projected field IDs',
+ projected_field_names TEXT COMMENT 'List of projected field names',
+ equality_delete_files BIGINT DEFAULT 0 COMMENT 'Number of equality delete
files',
+ indexed_delete_files BIGINT DEFAULT 0 COMMENT 'Number of indexed delete
files',
+ positional_delete_files BIGINT DEFAULT 0 COMMENT 'Number of positional
delete files',
+ result_data_files BIGINT DEFAULT 0 COMMENT 'Number of data files
processed',
+ result_delete_files BIGINT DEFAULT 0 COMMENT 'Number of delete files
processed',
+ scanned_data_manifests BIGINT DEFAULT 0 COMMENT 'Number of data manifests
scanned',
+ scanned_delete_manifests BIGINT DEFAULT 0 COMMENT 'Number of delete
manifests scanned',
+ skipped_data_files BIGINT DEFAULT 0 COMMENT 'Number of data files skipped',
+ skipped_data_manifests BIGINT DEFAULT 0 COMMENT 'Number of data manifests
skipped',
+ skipped_delete_files BIGINT DEFAULT 0 COMMENT 'Number of delete files
skipped',
+ skipped_delete_manifests BIGINT DEFAULT 0 COMMENT 'Number of delete
manifests skipped',
+ total_data_manifests BIGINT DEFAULT 0 COMMENT 'Total number of data
manifests',
+ total_delete_file_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total size of
delete files in bytes',
+ total_delete_manifests BIGINT DEFAULT 0 COMMENT 'Total number of delete
manifests',
+ total_file_size_in_bytes BIGINT DEFAULT 0 COMMENT 'Total file size in
bytes',
+ total_planning_duration BIGINT DEFAULT 0 COMMENT 'Total planning duration
in milliseconds',
+ KEY `idx_scan_report` (`timestamp`, `namespace`(255), `table_name`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'Table for
storing scan metrics results'
diff --git a/scripts/postgresql/iceberg-metrics-schema-1.1.0-postgresql.sql
b/scripts/postgresql/iceberg-metrics-schema-1.1.0-postgresql.sql
new file mode 100644
index 0000000000..14963a63a0
--- /dev/null
+++ b/scripts/postgresql/iceberg-metrics-schema-1.1.0-postgresql.sql
@@ -0,0 +1,169 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file--
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"). You may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+-- Note: Database and schema creation is not included in this script. Please
create the database and
+-- schema before running this script. for example in psql:
+-- CREATE DATABASE example_db;
+-- \c example_db
+-- CREATE SCHEMA example_schema;
+-- set search_path to example_schema;
+
+
+CREATE TABLE commit_metrics_report (
+ id BIGSERIAL PRIMARY KEY,
+ timestamp BIGINT NOT NULL,
+ namespace VARCHAR(1024) NOT NULL,
+ table_name VARCHAR(255) NOT NULL,
+ snapshot_id BIGINT NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ operation VARCHAR(50) NOT NULL,
+ added_data_files BIGINT DEFAULT 0,
+ removed_data_files BIGINT DEFAULT 0,
+ total_data_files BIGINT DEFAULT 0,
+ added_delete_files BIGINT DEFAULT 0,
+ added_equality_delete_files BIGINT DEFAULT 0,
+ added_positional_delete_files BIGINT DEFAULT 0,
+ removed_delete_files BIGINT DEFAULT 0,
+ removed_equality_delete_files BIGINT DEFAULT 0,
+ removed_positional_delete_files BIGINT DEFAULT 0,
+ total_delete_files BIGINT DEFAULT 0,
+ added_records BIGINT DEFAULT 0,
+ removed_records BIGINT DEFAULT 0,
+ total_records BIGINT DEFAULT 0,
+ added_files_size_in_bytes BIGINT DEFAULT 0,
+ removed_files_size_in_bytes BIGINT DEFAULT 0,
+ total_files_size_in_bytes BIGINT DEFAULT 0,
+ added_positional_deletes BIGINT DEFAULT 0,
+ removed_positional_deletes BIGINT DEFAULT 0,
+ total_positional_deletes BIGINT DEFAULT 0,
+ added_equality_deletes BIGINT DEFAULT 0,
+ removed_equality_deletes BIGINT DEFAULT 0,
+ total_equality_deletes BIGINT DEFAULT 0,
+ manifests_created BIGINT DEFAULT 0,
+ manifests_replaced BIGINT DEFAULT 0,
+ manifests_kept BIGINT DEFAULT 0,
+ manifest_entries_processed BIGINT DEFAULT 0,
+ added_dvs BIGINT DEFAULT 0,
+ removed_dvs BIGINT DEFAULT 0,
+ total_duration_ms BIGINT DEFAULT 0,
+ attempts BIGINT DEFAULT 1,
+ metadata TEXT
+);
+
+CREATE INDEX idx_commit_report ON commit_metrics_report (timestamp, namespace,
table_name);
+
+COMMENT ON TABLE commit_metrics_report IS 'Table for storing commit metrics
information';
+COMMENT ON COLUMN commit_metrics_report.timestamp IS 'Timestamp in
milliseconds';
+COMMENT ON COLUMN commit_metrics_report.namespace IS 'Namespace of the table';
+COMMENT ON COLUMN commit_metrics_report.table_name IS 'Table name';
+COMMENT ON COLUMN commit_metrics_report.snapshot_id IS 'Snapshot identifier';
+COMMENT ON COLUMN commit_metrics_report.operation IS 'Operation type (APPEND,
OVERWRITE, etc)';
+COMMENT ON COLUMN commit_metrics_report.metadata IS 'Additional metadata in
JSON format';
+COMMENT ON COLUMN commit_metrics_report.total_duration_ms IS 'Total operation
duration in milliseconds';
+COMMENT ON COLUMN commit_metrics_report.attempts IS 'Number of attempts';
+COMMENT ON COLUMN commit_metrics_report.added_data_files IS 'Number of added
data files';
+COMMENT ON COLUMN commit_metrics_report.removed_data_files IS 'Number of
removed data files';
+COMMENT ON COLUMN commit_metrics_report.total_data_files IS 'Total number of
data files';
+COMMENT ON COLUMN commit_metrics_report.added_delete_files IS 'Number of added
delete files';
+COMMENT ON COLUMN commit_metrics_report.added_equality_delete_files IS 'Number
of added equality delete files';
+COMMENT ON COLUMN commit_metrics_report.added_positional_delete_files IS
'Number of added positional delete files';
+COMMENT ON COLUMN commit_metrics_report.removed_delete_files IS 'Number of
removed delete files';
+COMMENT ON COLUMN commit_metrics_report.removed_equality_delete_files IS
'Number of removed equality delete files';
+COMMENT ON COLUMN commit_metrics_report.removed_positional_delete_files IS
'Number of removed positional delete files';
+COMMENT ON COLUMN commit_metrics_report.total_delete_files IS 'Total number of
delete files';
+COMMENT ON COLUMN commit_metrics_report.added_records IS 'Number of added
records';
+COMMENT ON COLUMN commit_metrics_report.removed_records IS 'Number of removed
records';
+COMMENT ON COLUMN commit_metrics_report.total_records IS 'Total number of
records';
+COMMENT ON COLUMN commit_metrics_report.added_files_size_in_bytes IS 'Size of
added files in bytes';
+COMMENT ON COLUMN commit_metrics_report.removed_files_size_in_bytes IS 'Size
of removed files in bytes';
+COMMENT ON COLUMN commit_metrics_report.total_files_size_in_bytes IS 'Total
file size in bytes';
+COMMENT ON COLUMN commit_metrics_report.added_positional_deletes IS 'Number of
added positional deletes';
+COMMENT ON COLUMN commit_metrics_report.removed_positional_deletes IS 'Number
of removed positional deletes';
+COMMENT ON COLUMN commit_metrics_report.total_positional_deletes IS 'Total
number of positional deletes';
+COMMENT ON COLUMN commit_metrics_report.added_equality_deletes IS 'Number of
added equality deletes';
+COMMENT ON COLUMN commit_metrics_report.removed_equality_deletes IS 'Number of
removed equality deletes';
+COMMENT ON COLUMN commit_metrics_report.total_equality_deletes IS 'Total
number of equality deletes';
+COMMENT ON COLUMN commit_metrics_report.manifests_created IS 'Number of
manifests created';
+COMMENT ON COLUMN commit_metrics_report.manifests_replaced IS 'Number of
manifests replaced';
+COMMENT ON COLUMN commit_metrics_report.manifests_kept IS 'Number of manifests
kept';
+COMMENT ON COLUMN commit_metrics_report.manifest_entries_processed IS 'Number
of manifest entries processed';
+COMMENT ON COLUMN commit_metrics_report.added_dvs IS 'Number of added delete
vectors';
+COMMENT ON COLUMN commit_metrics_report.removed_dvs IS 'Number of removed
delete vectors';
+COMMENT ON COLUMN commit_metrics_report.total_duration_ms IS 'Total operation
duration in milliseconds';
+COMMENT ON COLUMN commit_metrics_report.attempts IS 'Number of attempts';
+COMMENT ON COLUMN commit_metrics_report.metadata IS 'Additional metadata in
JSON format';
+
+
+CREATE TABLE scan_metrics_report (
+ id BIGSERIAL PRIMARY KEY,
+ timestamp BIGINT NOT NULL,
+ namespace VARCHAR(1024) NOT NULL,
+ table_name VARCHAR(255) NOT NULL,
+ snapshot_id BIGINT,
+ schema_id BIGINT,
+ filter TEXT,
+ metadata TEXT,
+ projected_field_ids TEXT,
+ projected_field_names TEXT,
+ equality_delete_files BIGINT DEFAULT 0,
+ indexed_delete_files BIGINT DEFAULT 0,
+ positional_delete_files BIGINT DEFAULT 0,
+ result_data_files BIGINT DEFAULT 0,
+ result_delete_files BIGINT DEFAULT 0,
+ scanned_data_manifests BIGINT DEFAULT 0,
+ scanned_delete_manifests BIGINT DEFAULT 0,
+ skipped_data_files BIGINT DEFAULT 0,
+ skipped_data_manifests BIGINT DEFAULT 0,
+ skipped_delete_files BIGINT DEFAULT 0,
+ skipped_delete_manifests BIGINT DEFAULT 0,
+ total_data_manifests BIGINT DEFAULT 0,
+ total_delete_file_size_in_bytes BIGINT DEFAULT 0,
+ total_delete_manifests BIGINT DEFAULT 0,
+ total_file_size_in_bytes BIGINT DEFAULT 0,
+ total_planning_duration BIGINT DEFAULT 0
+);
+
+CREATE INDEX idx_scan_report ON scan_metrics_report (timestamp, namespace,
table_name);
+
+COMMENT ON TABLE scan_metrics_report IS 'Table for storing scan metrics
information';
+COMMENT ON COLUMN scan_metrics_report.timestamp IS 'Timestamp in milliseconds';
+COMMENT ON COLUMN scan_metrics_report.namespace IS 'Namespace of the table';
+COMMENT ON COLUMN scan_metrics_report.table_name IS 'Table name';
+COMMENT ON COLUMN scan_metrics_report.snapshot_id IS 'Snapshot identifier';
+COMMENT ON COLUMN scan_metrics_report.schema_id IS 'Schema identifier';
+COMMENT ON COLUMN scan_metrics_report.filter IS 'Filter condition applied
during scan';
+COMMENT ON COLUMN scan_metrics_report.metadata IS 'Additional metadata in JSON
format';
+COMMENT ON COLUMN scan_metrics_report.projected_field_ids IS 'List of
projected field IDs';
+COMMENT ON COLUMN scan_metrics_report.projected_field_names IS 'List of
projected field names';
+COMMENT ON COLUMN scan_metrics_report.equality_delete_files IS 'Number of
equality delete files';
+COMMENT ON COLUMN scan_metrics_report.indexed_delete_files IS 'Number of
indexed delete files';
+COMMENT ON COLUMN scan_metrics_report.positional_delete_files IS 'Number of
positional delete files';
+COMMENT ON COLUMN scan_metrics_report.result_data_files IS 'Number of data
files processed';
+COMMENT ON COLUMN scan_metrics_report.result_delete_files IS 'Number of delete
files processed';
+COMMENT ON COLUMN scan_metrics_report.scanned_data_manifests IS 'Number of
data manifests scanned';
+COMMENT ON COLUMN scan_metrics_report.scanned_delete_manifests IS 'Number of
delete manifests scanned';
+COMMENT ON COLUMN scan_metrics_report.skipped_data_files IS 'Number of data
files skipped';
+COMMENT ON COLUMN scan_metrics_report.skipped_data_manifests IS 'Number of
data manifests skipped';
+COMMENT ON COLUMN scan_metrics_report.skipped_delete_files IS 'Number of
delete files skipped';
+COMMENT ON COLUMN scan_metrics_report.skipped_delete_manifests IS 'Number of
delete manifests skipped';
+COMMENT ON COLUMN scan_metrics_report.total_data_manifests IS 'Total number of
data manifests';
+COMMENT ON COLUMN scan_metrics_report.total_delete_file_size_in_bytes IS
'Total size of delete files in bytes';
+COMMENT ON COLUMN scan_metrics_report.total_delete_manifests IS 'Total number
of delete manifests';
+COMMENT ON COLUMN scan_metrics_report.total_file_size_in_bytes IS 'Total file
size in bytes';
+COMMENT ON COLUMN scan_metrics_report.total_planning_duration IS 'Total
planning duration in milliseconds';