This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 4f06b0911e Core: Add commit metrics for rewriting manifests (#12630) 4f06b0911e is described below commit 4f06b0911e870ad4ef329e77d68ae5ebe9135961 Author: Eduard Tudenhoefner <etudenhoef...@gmail.com> AuthorDate: Mon Mar 24 13:46:55 2025 +0100 Core: Add commit metrics for rewriting manifests (#12630) --- .../org/apache/iceberg/BaseRewriteManifests.java | 15 ++++------ .../java/org/apache/iceberg/SnapshotSummary.java | 4 +++ .../iceberg/metrics/CommitMetricsResult.java | 33 ++++++++++++++++++++++ .../iceberg/metrics/CommitMetricsResultParser.java | 29 ++++++++++++++++++- .../org/apache/iceberg/TestCommitReporting.java | 27 ++++++++---------- .../org/apache/iceberg/TestSnapshotSummary.java | 25 ++++++++++++++++ .../metrics/TestCommitMetricsResultParser.java | 24 ++++++++++++++++ 7 files changed, 131 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 6808c82f39..a86013fefc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -48,11 +48,6 @@ import org.apache.iceberg.util.Tasks; public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> implements RewriteManifests { - private static final String KEPT_MANIFESTS_COUNT = "manifests-kept"; - private static final String CREATED_MANIFESTS_COUNT = "manifests-created"; - private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced"; - private static final String PROCESSED_ENTRY_COUNT = "entries-processed"; - private final String tableName; private final Map<Integer, PartitionSpec> specsById; private final long manifestTargetSizeBytes; @@ -103,12 +98,14 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> protected Map<String, String> summary() { int createdManifestsCount = newManifests.size() + addedManifests.size() + rewrittenAddedManifests.size(); - summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(createdManifestsCount)); - summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size())); summaryBuilder.set( - REPLACED_MANIFESTS_COUNT, + SnapshotSummary.CREATED_MANIFESTS_COUNT, String.valueOf(createdManifestsCount)); + summaryBuilder.set(SnapshotSummary.KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size())); + summaryBuilder.set( + SnapshotSummary.REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size())); - summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get())); + summaryBuilder.set( + SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, String.valueOf(entryCount.get())); summaryBuilder.setPartitionSummaryLimit( 0); // do not include partition summaries because data did not change return summaryBuilder.build(); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 6043424cd7..392fad623f 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -62,6 +62,10 @@ public class SnapshotSummary { public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id"; public static final String REPLACE_PARTITIONS_PROP = "replace-partitions"; public static final String EXTRA_METADATA_PREFIX = "snapshot-property."; + public static final String CREATED_MANIFESTS_COUNT = "manifests-created"; + public static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced"; + public static final String KEPT_MANIFESTS_COUNT = "manifests-kept"; + public static final String PROCESSED_MANIFEST_ENTRY_COUNT = "entries-processed"; public static final MapJoiner MAP_JOINER = Joiner.on(",").withKeyValueSeparator("="); diff --git a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java index 7a87172708..ec0ced62bf 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java +++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java @@ -52,6 +52,10 @@ public interface CommitMetricsResult { String ADDED_EQ_DELETES = "added-equality-deletes"; String REMOVED_EQ_DELETES = "removed-equality-deletes"; String TOTAL_EQ_DELETES = "total-equality-deletes"; + String KEPT_MANIFESTS_COUNT = "manifests-kept"; + String CREATED_MANIFESTS_COUNT = "manifests-created"; + String REPLACED_MANIFESTS_COUNT = "manifests-replaced"; + String PROCESSED_MANIFEST_ENTRY_COUNT = "manifest-entries-processed"; @Nullable TimerResult totalDuration(); @@ -137,6 +141,30 @@ public interface CommitMetricsResult { @Nullable CounterResult totalEqualityDeletes(); + @Nullable + @Value.Default + default CounterResult manifestsCreated() { + return null; + } + + @Nullable + @Value.Default + default CounterResult manifestsReplaced() { + return null; + } + + @Nullable + @Value.Default + default CounterResult manifestsKept() { + return null; + } + + @Nullable + @Value.Default + default CounterResult manifestEntriesProcessed() { + return null; + } + static CommitMetricsResult from( CommitMetrics commitMetrics, Map<String, String> snapshotSummary) { Preconditions.checkArgument(null != commitMetrics, "Invalid commit metrics: null"); @@ -179,6 +207,11 @@ public interface CommitMetricsResult { .removedEqualityDeletes( counterFrom(snapshotSummary, SnapshotSummary.REMOVED_EQ_DELETES_PROP)) .totalEqualityDeletes(counterFrom(snapshotSummary, SnapshotSummary.TOTAL_EQ_DELETES_PROP)) + .manifestsCreated(counterFrom(snapshotSummary, SnapshotSummary.CREATED_MANIFESTS_COUNT)) + .manifestsReplaced(counterFrom(snapshotSummary, SnapshotSummary.REPLACED_MANIFESTS_COUNT)) + .manifestsKept(counterFrom(snapshotSummary, SnapshotSummary.KEPT_MANIFESTS_COUNT)) + .manifestEntriesProcessed( + counterFrom(snapshotSummary, SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT)) .build(); } diff --git a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java index 2c45581ba5..7ffc96f7f9 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java +++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java @@ -35,7 +35,7 @@ class CommitMetricsResultParser { return JsonUtil.generate(gen -> toJson(metrics, gen), pretty); } - @SuppressWarnings("checkstyle:CyclomaticComplexity") + @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"}) static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws IOException { Preconditions.checkArgument(null != metrics, "Invalid commit metrics: null"); @@ -171,6 +171,26 @@ class CommitMetricsResultParser { CounterResultParser.toJson(metrics.totalEqualityDeletes(), gen); } + if (null != metrics.manifestsCreated()) { + gen.writeFieldName(CommitMetricsResult.CREATED_MANIFESTS_COUNT); + CounterResultParser.toJson(metrics.manifestsCreated(), gen); + } + + if (null != metrics.manifestsReplaced()) { + gen.writeFieldName(CommitMetricsResult.REPLACED_MANIFESTS_COUNT); + CounterResultParser.toJson(metrics.manifestsReplaced(), gen); + } + + if (null != metrics.manifestsKept()) { + gen.writeFieldName(CommitMetricsResult.KEPT_MANIFESTS_COUNT); + CounterResultParser.toJson(metrics.manifestsKept(), gen); + } + + if (null != metrics.manifestEntriesProcessed()) { + gen.writeFieldName(CommitMetricsResult.PROCESSED_MANIFEST_ENTRY_COUNT); + CounterResultParser.toJson(metrics.manifestEntriesProcessed(), gen); + } + gen.writeEndObject(); } @@ -227,6 +247,13 @@ class CommitMetricsResultParser { CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETES, json)) .totalEqualityDeletes( CounterResultParser.fromJson(CommitMetricsResult.TOTAL_EQ_DELETES, json)) + .manifestsCreated( + CounterResultParser.fromJson(CommitMetricsResult.CREATED_MANIFESTS_COUNT, json)) + .manifestsReplaced( + CounterResultParser.fromJson(CommitMetricsResult.REPLACED_MANIFESTS_COUNT, json)) + .manifestsKept(CounterResultParser.fromJson(CommitMetricsResult.KEPT_MANIFESTS_COUNT, json)) + .manifestEntriesProcessed( + CounterResultParser.fromJson(CommitMetricsResult.PROCESSED_MANIFEST_ENTRY_COUNT, json)) .build(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java index d018612a7a..d446f45c8b 100644 --- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java +++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java @@ -20,7 +20,6 @@ package org.apache.iceberg; import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter; @@ -170,29 +169,16 @@ public class TestCommitReporting extends TestBase { } @TestTemplate - public void addAndDeleteManifests() throws IOException { + public void addAndDeleteManifests() { String tableName = "add-and-delete-manifests"; Table table = TestTables.create( tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter); table.newAppend().appendFile(FILE_A).commit(); - Snapshot snap1 = table.currentSnapshot(); table.newAppend().appendFile(FILE_B).commit(); - Snapshot snap2 = table.currentSnapshot(); - ManifestFile newManifest = - writeManifest( - "manifest-file.avro", - manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), FILE_A), - manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), FILE_B)); - - RewriteManifests rewriteManifests = table.rewriteManifests(); - for (ManifestFile manifest : snap2.dataManifests(table.io())) { - rewriteManifests.deleteManifest(manifest); - } - - rewriteManifests.addManifest(newManifest).commit(); + table.rewriteManifests().clusterBy(file -> "file").rewriteIf(ignored -> true).commit(); CommitReport report = reporter.lastCommitReport(); assertThat(report).isNotNull(); @@ -200,5 +186,14 @@ public class TestCommitReporting extends TestBase { assertThat(report.snapshotId()).isEqualTo(3L); assertThat(report.sequenceNumber()).isEqualTo(3L); assertThat(report.tableName()).isEqualTo(tableName); + + CommitMetricsResult metrics = report.commitMetrics(); + assertThat(metrics.totalDataFiles().value()).isEqualTo(2L); + assertThat(metrics.totalRecords().value()).isEqualTo(2L); + assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(20L); + assertThat(metrics.manifestsCreated().value()).isEqualTo(1L); + assertThat(metrics.manifestsKept().value()).isEqualTo(0L); + assertThat(metrics.manifestsReplaced().value()).isEqualTo(2L); + assertThat(metrics.manifestEntriesProcessed().value()).isEqualTo(2L); } } diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java index 9c67e766a9..c76ca45b53 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java @@ -430,4 +430,29 @@ public class TestSnapshotSummary extends TestBase { .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0") .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2"); } + + @TestTemplate + public void rewriteManifestsWithDuplicateFiles() { + assertThat(listManifestFiles()).isEmpty(); + + table.newAppend().appendFile(FILE_A).commit(); + table.newAppend().appendFile(FILE_B).commit(); + table.newAppend().appendFile(FILE_C).commit(); + + table.rewriteManifests().clusterBy(file -> "file").rewriteIf(ignored -> true).commit(); + + assertThat(table.currentSnapshot().summary()) + .hasSize(12) + .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "0") + .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3") + .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0") + .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0") + .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0") + .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "30") + .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "3") + .containsEntry(SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, "3") + .containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "1") + .containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0") + .containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "3"); + } } diff --git a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java index 1b51066cf1..e2e40a8be1 100644 --- a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java +++ b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java @@ -92,6 +92,10 @@ public class TestCommitMetricsResultParser { .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20") .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21") .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22") + .put(SnapshotSummary.CREATED_MANIFESTS_COUNT, "10") + .put(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "4") + .put(SnapshotSummary.KEPT_MANIFESTS_COUNT, "6") + .put(SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, "20") .build(); CommitMetricsResult result = CommitMetricsResult.from(commitMetrics, snapshotSummary); @@ -121,6 +125,10 @@ public class TestCommitMetricsResultParser { assertThat(result.removedEqualityDeletes().value()).isEqualTo(20L); assertThat(result.totalPositionalDeletes().value()).isEqualTo(21L); assertThat(result.totalEqualityDeletes().value()).isEqualTo(22L); + assertThat(result.manifestsCreated().value()).isEqualTo(10L); + assertThat(result.manifestsReplaced().value()).isEqualTo(4L); + assertThat(result.manifestsKept().value()).isEqualTo(6L); + assertThat(result.manifestEntriesProcessed().value()).isEqualTo(20L); String expectedJson = "{\n" @@ -228,6 +236,22 @@ public class TestCommitMetricsResultParser { + " \"total-equality-deletes\" : {\n" + " \"unit\" : \"count\",\n" + " \"value\" : 22\n" + + " },\n" + + " \"manifests-created\" : {\n" + + " \"unit\" : \"count\",\n" + + " \"value\" : 10\n" + + " },\n" + + " \"manifests-replaced\" : {\n" + + " \"unit\" : \"count\",\n" + + " \"value\" : 4\n" + + " },\n" + + " \"manifests-kept\" : {\n" + + " \"unit\" : \"count\",\n" + + " \"value\" : 6\n" + + " },\n" + + " \"manifest-entries-processed\" : {\n" + + " \"unit\" : \"count\",\n" + + " \"value\" : 20\n" + " }\n" + "}";