This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f06b0911e Core: Add commit metrics for rewriting manifests (#12630)
4f06b0911e is described below

commit 4f06b0911e870ad4ef329e77d68ae5ebe9135961
Author: Eduard Tudenhoefner <etudenhoef...@gmail.com>
AuthorDate: Mon Mar 24 13:46:55 2025 +0100

    Core: Add commit metrics for rewriting manifests (#12630)
---
 .../org/apache/iceberg/BaseRewriteManifests.java   | 15 ++++------
 .../java/org/apache/iceberg/SnapshotSummary.java   |  4 +++
 .../iceberg/metrics/CommitMetricsResult.java       | 33 ++++++++++++++++++++++
 .../iceberg/metrics/CommitMetricsResultParser.java | 29 ++++++++++++++++++-
 .../org/apache/iceberg/TestCommitReporting.java    | 27 ++++++++----------
 .../org/apache/iceberg/TestSnapshotSummary.java    | 25 ++++++++++++++++
 .../metrics/TestCommitMetricsResultParser.java     | 24 ++++++++++++++++
 7 files changed, 131 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java 
b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 6808c82f39..a86013fefc 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -48,11 +48,6 @@ import org.apache.iceberg.util.Tasks;
 
 public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
     implements RewriteManifests {
-  private static final String KEPT_MANIFESTS_COUNT = "manifests-kept";
-  private static final String CREATED_MANIFESTS_COUNT = "manifests-created";
-  private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
-  private static final String PROCESSED_ENTRY_COUNT = "entries-processed";
-
   private final String tableName;
   private final Map<Integer, PartitionSpec> specsById;
   private final long manifestTargetSizeBytes;
@@ -103,12 +98,14 @@ public class BaseRewriteManifests extends 
SnapshotProducer<RewriteManifests>
   protected Map<String, String> summary() {
     int createdManifestsCount =
         newManifests.size() + addedManifests.size() + 
rewrittenAddedManifests.size();
-    summaryBuilder.set(CREATED_MANIFESTS_COUNT, 
String.valueOf(createdManifestsCount));
-    summaryBuilder.set(KEPT_MANIFESTS_COUNT, 
String.valueOf(keptManifests.size()));
     summaryBuilder.set(
-        REPLACED_MANIFESTS_COUNT,
+        SnapshotSummary.CREATED_MANIFESTS_COUNT, 
String.valueOf(createdManifestsCount));
+    summaryBuilder.set(SnapshotSummary.KEPT_MANIFESTS_COUNT, 
String.valueOf(keptManifests.size()));
+    summaryBuilder.set(
+        SnapshotSummary.REPLACED_MANIFESTS_COUNT,
         String.valueOf(rewrittenManifests.size() + deletedManifests.size()));
-    summaryBuilder.set(PROCESSED_ENTRY_COUNT, 
String.valueOf(entryCount.get()));
+    summaryBuilder.set(
+        SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, 
String.valueOf(entryCount.get()));
     summaryBuilder.setPartitionSummaryLimit(
         0); // do not include partition summaries because data did not change
     return summaryBuilder.build();
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java 
b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
index 6043424cd7..392fad623f 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java
@@ -62,6 +62,10 @@ public class SnapshotSummary {
   public static final String SOURCE_SNAPSHOT_ID_PROP = "source-snapshot-id";
   public static final String REPLACE_PARTITIONS_PROP = "replace-partitions";
   public static final String EXTRA_METADATA_PREFIX = "snapshot-property.";
+  public static final String CREATED_MANIFESTS_COUNT = "manifests-created";
+  public static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
+  public static final String KEPT_MANIFESTS_COUNT = "manifests-kept";
+  public static final String PROCESSED_MANIFEST_ENTRY_COUNT = 
"entries-processed";
 
   public static final MapJoiner MAP_JOINER = 
Joiner.on(",").withKeyValueSeparator("=");
 
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
index 7a87172708..ec0ced62bf 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
@@ -52,6 +52,10 @@ public interface CommitMetricsResult {
   String ADDED_EQ_DELETES = "added-equality-deletes";
   String REMOVED_EQ_DELETES = "removed-equality-deletes";
   String TOTAL_EQ_DELETES = "total-equality-deletes";
+  String KEPT_MANIFESTS_COUNT = "manifests-kept";
+  String CREATED_MANIFESTS_COUNT = "manifests-created";
+  String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
+  String PROCESSED_MANIFEST_ENTRY_COUNT = "manifest-entries-processed";
 
   @Nullable
   TimerResult totalDuration();
@@ -137,6 +141,30 @@ public interface CommitMetricsResult {
   @Nullable
   CounterResult totalEqualityDeletes();
 
+  @Nullable
+  @Value.Default
+  default CounterResult manifestsCreated() {
+    return null;
+  }
+
+  @Nullable
+  @Value.Default
+  default CounterResult manifestsReplaced() {
+    return null;
+  }
+
+  @Nullable
+  @Value.Default
+  default CounterResult manifestsKept() {
+    return null;
+  }
+
+  @Nullable
+  @Value.Default
+  default CounterResult manifestEntriesProcessed() {
+    return null;
+  }
+
   static CommitMetricsResult from(
       CommitMetrics commitMetrics, Map<String, String> snapshotSummary) {
     Preconditions.checkArgument(null != commitMetrics, "Invalid commit 
metrics: null");
@@ -179,6 +207,11 @@ public interface CommitMetricsResult {
         .removedEqualityDeletes(
             counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_EQ_DELETES_PROP))
         .totalEqualityDeletes(counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_EQ_DELETES_PROP))
+        .manifestsCreated(counterFrom(snapshotSummary, 
SnapshotSummary.CREATED_MANIFESTS_COUNT))
+        .manifestsReplaced(counterFrom(snapshotSummary, 
SnapshotSummary.REPLACED_MANIFESTS_COUNT))
+        .manifestsKept(counterFrom(snapshotSummary, 
SnapshotSummary.KEPT_MANIFESTS_COUNT))
+        .manifestEntriesProcessed(
+            counterFrom(snapshotSummary, 
SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT))
         .build();
   }
 
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
index 2c45581ba5..7ffc96f7f9 100644
--- 
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
+++ 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
@@ -35,7 +35,7 @@ class CommitMetricsResultParser {
     return JsonUtil.generate(gen -> toJson(metrics, gen), pretty);
   }
 
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  @SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
   static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws 
IOException {
     Preconditions.checkArgument(null != metrics, "Invalid commit metrics: 
null");
 
@@ -171,6 +171,26 @@ class CommitMetricsResultParser {
       CounterResultParser.toJson(metrics.totalEqualityDeletes(), gen);
     }
 
+    if (null != metrics.manifestsCreated()) {
+      gen.writeFieldName(CommitMetricsResult.CREATED_MANIFESTS_COUNT);
+      CounterResultParser.toJson(metrics.manifestsCreated(), gen);
+    }
+
+    if (null != metrics.manifestsReplaced()) {
+      gen.writeFieldName(CommitMetricsResult.REPLACED_MANIFESTS_COUNT);
+      CounterResultParser.toJson(metrics.manifestsReplaced(), gen);
+    }
+
+    if (null != metrics.manifestsKept()) {
+      gen.writeFieldName(CommitMetricsResult.KEPT_MANIFESTS_COUNT);
+      CounterResultParser.toJson(metrics.manifestsKept(), gen);
+    }
+
+    if (null != metrics.manifestEntriesProcessed()) {
+      gen.writeFieldName(CommitMetricsResult.PROCESSED_MANIFEST_ENTRY_COUNT);
+      CounterResultParser.toJson(metrics.manifestEntriesProcessed(), gen);
+    }
+
     gen.writeEndObject();
   }
 
@@ -227,6 +247,13 @@ class CommitMetricsResultParser {
             
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETES, json))
         .totalEqualityDeletes(
             CounterResultParser.fromJson(CommitMetricsResult.TOTAL_EQ_DELETES, 
json))
+        .manifestsCreated(
+            
CounterResultParser.fromJson(CommitMetricsResult.CREATED_MANIFESTS_COUNT, json))
+        .manifestsReplaced(
+            
CounterResultParser.fromJson(CommitMetricsResult.REPLACED_MANIFESTS_COUNT, 
json))
+        
.manifestsKept(CounterResultParser.fromJson(CommitMetricsResult.KEPT_MANIFESTS_COUNT,
 json))
+        .manifestEntriesProcessed(
+            
CounterResultParser.fromJson(CommitMetricsResult.PROCESSED_MANIFEST_ENTRY_COUNT,
 json))
         .build();
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java 
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
index d018612a7a..d446f45c8b 100644
--- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -20,7 +20,6 @@ package org.apache.iceberg;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter;
@@ -170,29 +169,16 @@ public class TestCommitReporting extends TestBase {
   }
 
   @TestTemplate
-  public void addAndDeleteManifests() throws IOException {
+  public void addAndDeleteManifests() {
     String tableName = "add-and-delete-manifests";
     Table table =
         TestTables.create(
             tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
 
     table.newAppend().appendFile(FILE_A).commit();
-    Snapshot snap1 = table.currentSnapshot();
     table.newAppend().appendFile(FILE_B).commit();
-    Snapshot snap2 = table.currentSnapshot();
 
-    ManifestFile newManifest =
-        writeManifest(
-            "manifest-file.avro",
-            manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), 
FILE_A),
-            manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), 
FILE_B));
-
-    RewriteManifests rewriteManifests = table.rewriteManifests();
-    for (ManifestFile manifest : snap2.dataManifests(table.io())) {
-      rewriteManifests.deleteManifest(manifest);
-    }
-
-    rewriteManifests.addManifest(newManifest).commit();
+    table.rewriteManifests().clusterBy(file -> "file").rewriteIf(ignored -> 
true).commit();
 
     CommitReport report = reporter.lastCommitReport();
     assertThat(report).isNotNull();
@@ -200,5 +186,14 @@ public class TestCommitReporting extends TestBase {
     assertThat(report.snapshotId()).isEqualTo(3L);
     assertThat(report.sequenceNumber()).isEqualTo(3L);
     assertThat(report.tableName()).isEqualTo(tableName);
+
+    CommitMetricsResult metrics = report.commitMetrics();
+    assertThat(metrics.totalDataFiles().value()).isEqualTo(2L);
+    assertThat(metrics.totalRecords().value()).isEqualTo(2L);
+    assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(20L);
+    assertThat(metrics.manifestsCreated().value()).isEqualTo(1L);
+    assertThat(metrics.manifestsKept().value()).isEqualTo(0L);
+    assertThat(metrics.manifestsReplaced().value()).isEqualTo(2L);
+    assertThat(metrics.manifestEntriesProcessed().value()).isEqualTo(2L);
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
index 9c67e766a9..c76ca45b53 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
@@ -430,4 +430,29 @@ public class TestSnapshotSummary extends TestBase {
         .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0")
         .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2");
   }
+
+  @TestTemplate
+  public void rewriteManifestsWithDuplicateFiles() {
+    assertThat(listManifestFiles()).isEmpty();
+
+    table.newAppend().appendFile(FILE_A).commit();
+    table.newAppend().appendFile(FILE_B).commit();
+    table.newAppend().appendFile(FILE_C).commit();
+
+    table.rewriteManifests().clusterBy(file -> "file").rewriteIf(ignored -> 
true).commit();
+
+    assertThat(table.currentSnapshot().summary())
+        .hasSize(12)
+        .containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+        .containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, "0")
+        .containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "30")
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "3")
+        .containsEntry(SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, "3")
+        .containsEntry(SnapshotSummary.CREATED_MANIFESTS_COUNT, "1")
+        .containsEntry(SnapshotSummary.KEPT_MANIFESTS_COUNT, "0")
+        .containsEntry(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "3");
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
 
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
index 1b51066cf1..e2e40a8be1 100644
--- 
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
+++ 
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
@@ -92,6 +92,10 @@ public class TestCommitMetricsResultParser {
             .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
             .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
             .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+            .put(SnapshotSummary.CREATED_MANIFESTS_COUNT, "10")
+            .put(SnapshotSummary.REPLACED_MANIFESTS_COUNT, "4")
+            .put(SnapshotSummary.KEPT_MANIFESTS_COUNT, "6")
+            .put(SnapshotSummary.PROCESSED_MANIFEST_ENTRY_COUNT, "20")
             .build();
 
     CommitMetricsResult result = CommitMetricsResult.from(commitMetrics, 
snapshotSummary);
@@ -121,6 +125,10 @@ public class TestCommitMetricsResultParser {
     assertThat(result.removedEqualityDeletes().value()).isEqualTo(20L);
     assertThat(result.totalPositionalDeletes().value()).isEqualTo(21L);
     assertThat(result.totalEqualityDeletes().value()).isEqualTo(22L);
+    assertThat(result.manifestsCreated().value()).isEqualTo(10L);
+    assertThat(result.manifestsReplaced().value()).isEqualTo(4L);
+    assertThat(result.manifestsKept().value()).isEqualTo(6L);
+    assertThat(result.manifestEntriesProcessed().value()).isEqualTo(20L);
 
     String expectedJson =
         "{\n"
@@ -228,6 +236,22 @@ public class TestCommitMetricsResultParser {
             + "  \"total-equality-deletes\" : {\n"
             + "    \"unit\" : \"count\",\n"
             + "    \"value\" : 22\n"
+            + "  },\n"
+            + "  \"manifests-created\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 10\n"
+            + "  },\n"
+            + "  \"manifests-replaced\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 4\n"
+            + "  },\n"
+            + "  \"manifests-kept\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 6\n"
+            + "  },\n"
+            + "  \"manifest-entries-processed\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 20\n"
             + "  }\n"
             + "}";
 

Reply via email to