This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 715c9b90f5 Core: Create and report metrics about Snapshots (#6246)
715c9b90f5 is described below

commit 715c9b90f537a215f38da5a8ddad19977222860b
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Mon Dec 19 17:33:08 2022 +0100

    Core: Create and report metrics about Snapshots (#6246)
---
 .../apache/iceberg/metrics/MetricsReporter.java    |   2 +-
 .../main/java/org/apache/iceberg/BaseTable.java    |  20 +-
 .../java/org/apache/iceberg/BaseTransaction.java   |  34 ++-
 .../java/org/apache/iceberg/SnapshotProducer.java  |  42 +++
 .../java/org/apache/iceberg/TableScanContext.java  |   2 +-
 .../main/java/org/apache/iceberg/Transactions.java |  18 ++
 .../org/apache/iceberg/metrics/CommitMetrics.java  |  50 ++++
 .../iceberg/metrics/CommitMetricsResult.java       | 185 +++++++++++++
 .../iceberg/metrics/CommitMetricsResultParser.java | 220 +++++++++++++++
 .../org/apache/iceberg/metrics/CommitReport.java   |  29 +-
 .../apache/iceberg/metrics/CommitReportParser.java | 101 +++++++
 .../apache/iceberg/metrics/ScanReportParser.java   |   7 +-
 .../apache/iceberg/rest/RESTSessionCatalog.java    |   6 +-
 .../rest/requests/ReportMetricsRequest.java        |  12 +-
 .../rest/requests/ReportMetricsRequestParser.java  |  13 +
 .../main/java/org/apache/iceberg/util/Tasks.java   |  11 +
 .../org/apache/iceberg/TestCommitReporting.java    | 190 +++++++++++++
 .../iceberg/TestScanPlanningAndReporting.java      |  14 +-
 .../metrics/TestCommitMetricsResultParser.java     | 239 ++++++++++++++++
 .../iceberg/metrics/TestCommitReportParser.java    | 302 +++++++++++++++++++++
 .../org/apache/iceberg/rest/TestRESTCatalog.java   |   3 +-
 .../requests/TestReportMetricsRequestParser.java   |  38 ++-
 .../java/org/apache/iceberg/util/TestTasks.java    |  59 ++++
 open-api/rest-catalog-open-api.yaml                |  31 +++
 24 files changed, 1582 insertions(+), 46 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java 
b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
index 45e3d16bff..5fae755bbe 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
@@ -23,7 +23,7 @@ package org.apache.iceberg.metrics;
 public interface MetricsReporter {
 
   /**
-   * Indicates that a operation is done by reporting a {@link MetricsReport}. 
A {@link
+   * Indicates that an operation is done by reporting a {@link MetricsReport}. 
A {@link
    * MetricsReport} is usually directly derived from a {@link MetricsReport} 
instance.
    *
    * @param report The {@link MetricsReport} to report.
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 0b72027d55..bfa9cb7ce5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -44,7 +44,7 @@ public class BaseTable implements Table, HasTableOperations, 
Serializable {
   public BaseTable(TableOperations ops, String name) {
     this.ops = ops;
     this.name = name;
-    this.reporter = new LoggingMetricsReporter();
+    this.reporter = LoggingMetricsReporter.instance();
   }
 
   public BaseTable(TableOperations ops, String name, MetricsReporter reporter) 
{
@@ -171,42 +171,42 @@ public class BaseTable implements Table, 
HasTableOperations, Serializable {
 
   @Override
   public AppendFiles newAppend() {
-    return new MergeAppend(name, ops);
+    return new MergeAppend(name, ops).reportWith(reporter);
   }
 
   @Override
   public AppendFiles newFastAppend() {
-    return new FastAppend(name, ops);
+    return new FastAppend(name, ops).reportWith(reporter);
   }
 
   @Override
   public RewriteFiles newRewrite() {
-    return new BaseRewriteFiles(name, ops);
+    return new BaseRewriteFiles(name, ops).reportWith(reporter);
   }
 
   @Override
   public RewriteManifests rewriteManifests() {
-    return new BaseRewriteManifests(ops);
+    return new BaseRewriteManifests(ops).reportWith(reporter);
   }
 
   @Override
   public OverwriteFiles newOverwrite() {
-    return new BaseOverwriteFiles(name, ops);
+    return new BaseOverwriteFiles(name, ops).reportWith(reporter);
   }
 
   @Override
   public RowDelta newRowDelta() {
-    return new BaseRowDelta(name, ops);
+    return new BaseRowDelta(name, ops).reportWith(reporter);
   }
 
   @Override
   public ReplacePartitions newReplacePartitions() {
-    return new BaseReplacePartitions(name, ops);
+    return new BaseReplacePartitions(name, ops).reportWith(reporter);
   }
 
   @Override
   public DeleteFiles newDelete() {
-    return new StreamingDelete(name, ops);
+    return new StreamingDelete(name, ops).reportWith(reporter);
   }
 
   @Override
@@ -226,7 +226,7 @@ public class BaseTable implements Table, 
HasTableOperations, Serializable {
 
   @Override
   public Transaction newTransaction() {
-    return Transactions.newTransaction(name, ops);
+    return Transactions.newTransaction(name, ops, reporter);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java 
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 241738feda..f1832f2787 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -39,6 +39,8 @@ import 
org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -71,9 +73,19 @@ public class BaseTransaction implements Transaction {
   private TableMetadata base;
   private TableMetadata current;
   private boolean hasLastOpCommitted;
+  private final MetricsReporter reporter;
 
   BaseTransaction(
       String tableName, TableOperations ops, TransactionType type, 
TableMetadata start) {
+    this(tableName, ops, type, start, LoggingMetricsReporter.instance());
+  }
+
+  BaseTransaction(
+      String tableName,
+      TableOperations ops,
+      TransactionType type,
+      TableMetadata start,
+      MetricsReporter reporter) {
     this.tableName = tableName;
     this.ops = ops;
     this.transactionTable = new TransactionTable();
@@ -84,6 +96,7 @@ public class BaseTransaction implements Transaction {
     this.base = ops.current();
     this.type = type;
     this.hasLastOpCommitted = true;
+    this.reporter = reporter;
   }
 
   @Override
@@ -148,7 +161,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public AppendFiles newAppend() {
     checkLastOperationCommitted("AppendFiles");
-    AppendFiles append = new MergeAppend(tableName, transactionOps);
+    AppendFiles append = new MergeAppend(tableName, 
transactionOps).reportWith(reporter);
     append.deleteWith(enqueueDelete);
     updates.add(append);
     return append;
@@ -157,7 +170,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public AppendFiles newFastAppend() {
     checkLastOperationCommitted("AppendFiles");
-    AppendFiles append = new FastAppend(tableName, transactionOps);
+    AppendFiles append = new FastAppend(tableName, 
transactionOps).reportWith(reporter);
     updates.add(append);
     return append;
   }
@@ -165,7 +178,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public RewriteFiles newRewrite() {
     checkLastOperationCommitted("RewriteFiles");
-    RewriteFiles rewrite = new BaseRewriteFiles(tableName, transactionOps);
+    RewriteFiles rewrite = new BaseRewriteFiles(tableName, 
transactionOps).reportWith(reporter);
     rewrite.deleteWith(enqueueDelete);
     updates.add(rewrite);
     return rewrite;
@@ -174,7 +187,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public RewriteManifests rewriteManifests() {
     checkLastOperationCommitted("RewriteManifests");
-    RewriteManifests rewrite = new BaseRewriteManifests(transactionOps);
+    RewriteManifests rewrite = new 
BaseRewriteManifests(transactionOps).reportWith(reporter);
     rewrite.deleteWith(enqueueDelete);
     updates.add(rewrite);
     return rewrite;
@@ -183,7 +196,8 @@ public class BaseTransaction implements Transaction {
   @Override
   public OverwriteFiles newOverwrite() {
     checkLastOperationCommitted("OverwriteFiles");
-    OverwriteFiles overwrite = new BaseOverwriteFiles(tableName, 
transactionOps);
+    OverwriteFiles overwrite =
+        new BaseOverwriteFiles(tableName, transactionOps).reportWith(reporter);
     overwrite.deleteWith(enqueueDelete);
     updates.add(overwrite);
     return overwrite;
@@ -192,7 +206,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public RowDelta newRowDelta() {
     checkLastOperationCommitted("RowDelta");
-    RowDelta delta = new BaseRowDelta(tableName, transactionOps);
+    RowDelta delta = new BaseRowDelta(tableName, 
transactionOps).reportWith(reporter);
     delta.deleteWith(enqueueDelete);
     updates.add(delta);
     return delta;
@@ -201,7 +215,8 @@ public class BaseTransaction implements Transaction {
   @Override
   public ReplacePartitions newReplacePartitions() {
     checkLastOperationCommitted("ReplacePartitions");
-    ReplacePartitions replacePartitions = new BaseReplacePartitions(tableName, 
transactionOps);
+    ReplacePartitions replacePartitions =
+        new BaseReplacePartitions(tableName, 
transactionOps).reportWith(reporter);
     replacePartitions.deleteWith(enqueueDelete);
     updates.add(replacePartitions);
     return replacePartitions;
@@ -210,7 +225,7 @@ public class BaseTransaction implements Transaction {
   @Override
   public DeleteFiles newDelete() {
     checkLastOperationCommitted("DeleteFiles");
-    DeleteFiles delete = new StreamingDelete(tableName, transactionOps);
+    DeleteFiles delete = new StreamingDelete(tableName, 
transactionOps).reportWith(reporter);
     delete.deleteWith(enqueueDelete);
     updates.add(delete);
     return delete;
@@ -235,7 +250,8 @@ public class BaseTransaction implements Transaction {
 
   CherryPickOperation cherryPick() {
     checkLastOperationCommitted("CherryPick");
-    CherryPickOperation cherrypick = new CherryPickOperation(tableName, 
transactionOps);
+    CherryPickOperation cherrypick =
+        new CherryPickOperation(tableName, 
transactionOps).reportWith(reporter);
     updates.add(cherrypick);
     return cherrypick;
   }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0e8732ea63..f11cad4e79 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -39,11 +39,19 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.Timer.Timed;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -76,6 +84,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private final AtomicInteger manifestCount = new AtomicInteger(0);
   private final AtomicInteger attempt = new AtomicInteger(0);
   private final List<String> manifestLists = Lists.newArrayList();
+  private MetricsReporter reporter = LoggingMetricsReporter.instance();
   private volatile Long snapshotId = null;
   private TableMetadata base;
   private boolean stageOnly = false;
@@ -83,6 +92,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
 
   private ExecutorService workerPool = ThreadPools.getWorkerPool();
   private String targetBranch = SnapshotRef.MAIN_BRANCH;
+  private CommitMetrics commitMetrics;
 
   protected SnapshotProducer(TableOperations ops) {
     this.ops = ops;
@@ -112,6 +122,19 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     return self();
   }
 
+  protected CommitMetrics commitMetrics() {
+    if (commitMetrics == null) {
+      this.commitMetrics = CommitMetrics.of(new DefaultMetricsContext());
+    }
+
+    return commitMetrics;
+  }
+
+  protected ThisT reportWith(MetricsReporter newReporter) {
+    this.reporter = newReporter;
+    return self();
+  }
+
   /**
    * * A setter for the target branch on which snapshot producer operation 
should be performed
    *
@@ -328,6 +351,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   public void commit() {
     // this is always set to the latest commit attempt's snapshot id.
     AtomicLong newSnapshotId = new AtomicLong(-1L);
+    Timed totalDuration = commitMetrics().totalDuration().start();
     try {
       Tasks.foreach(ops)
           .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
@@ -337,6 +361,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
               base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
               2.0 /* exponential */)
           .onlyRetryOn(CommitFailedException.class)
+          .countAttempts(commitMetrics().attempts())
           .run(
               taskOps -> {
                 Snapshot newSnapshot = apply();
@@ -397,6 +422,8 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
           "Failed to load committed table metadata or during cleanup, skipping 
further cleanup", e);
     }
 
+    totalDuration.stop();
+
     try {
       notifyListeners();
     } catch (Throwable e) {
@@ -409,6 +436,21 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
       Object event = updateEvent();
       if (event != null) {
         Listeners.notifyAll(event);
+
+        if (event instanceof CreateSnapshotEvent) {
+          CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) 
event;
+
+          reporter.report(
+              ImmutableCommitReport.builder()
+                  .tableName(createSnapshotEvent.tableName())
+                  .snapshotId(createSnapshotEvent.snapshotId())
+                  .operation(createSnapshotEvent.operation())
+                  .sequenceNumber(createSnapshotEvent.sequenceNumber())
+                  .metadata(EnvironmentContext.get())
+                  .commitMetrics(
+                      CommitMetricsResult.from(commitMetrics(), 
createSnapshotEvent.summary()))
+                  .build());
+        }
       }
     } catch (RuntimeException e) {
       LOG.warn("Failed to notify listeners", e);
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index e484da6f15..6a3c7cc6e9 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -59,7 +59,7 @@ final class TableScanContext {
     this.toSnapshotId = null;
     this.planExecutor = null;
     this.fromSnapshotInclusive = false;
-    this.metricsReporter = new LoggingMetricsReporter();
+    this.metricsReporter = LoggingMetricsReporter.instance();
   }
 
   private TableScanContext(
diff --git a/core/src/main/java/org/apache/iceberg/Transactions.java 
b/core/src/main/java/org/apache/iceberg/Transactions.java
index 32d3dedfe8..7afed0573a 100644
--- a/core/src/main/java/org/apache/iceberg/Transactions.java
+++ b/core/src/main/java/org/apache/iceberg/Transactions.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg;
 
 import org.apache.iceberg.BaseTransaction.TransactionType;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 public final class Transactions {
@@ -34,6 +35,11 @@ public final class Transactions {
     return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, 
start);
   }
 
+  public static Transaction replaceTableTransaction(
+      String tableName, TableOperations ops, TableMetadata start, 
MetricsReporter reporter) {
+    return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, 
start, reporter);
+  }
+
   public static Transaction createTableTransaction(
       String tableName, TableOperations ops, TableMetadata start) {
     Preconditions.checkArgument(
@@ -41,7 +47,19 @@ public final class Transactions {
     return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE, 
start);
   }
 
+  public static Transaction createTableTransaction(
+      String tableName, TableOperations ops, TableMetadata start, 
MetricsReporter reporter) {
+    Preconditions.checkArgument(
+        ops.current() == null, "Cannot start create table transaction: table 
already exists");
+    return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE, 
start, reporter);
+  }
+
   public static Transaction newTransaction(String tableName, TableOperations 
ops) {
     return new BaseTransaction(tableName, ops, TransactionType.SIMPLE, 
ops.refresh());
   }
+
+  public static Transaction newTransaction(
+      String tableName, TableOperations ops, MetricsReporter reporter) {
+    return new BaseTransaction(tableName, ops, TransactionType.SIMPLE, 
ops.refresh(), reporter);
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java
new file mode 100644
index 0000000000..26a662d62e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.immutables.value.Value;
+
+/** Carries all metrics for a particular commit */
[email protected]
+public abstract class CommitMetrics {
+  public static final String TOTAL_DURATION = "total-duration";
+  public static final String ATTEMPTS = "attempts";
+
+  public static CommitMetrics noop() {
+    return CommitMetrics.of(MetricsContext.nullMetrics());
+  }
+
+  public abstract MetricsContext metricsContext();
+
+  @Value.Derived
+  public Timer totalDuration() {
+    return metricsContext().timer(TOTAL_DURATION, TimeUnit.NANOSECONDS);
+  }
+
+  @Value.Derived
+  public Counter attempts() {
+    return metricsContext().counter(ATTEMPTS, Unit.COUNT);
+  }
+
+  public static CommitMetrics of(MetricsContext metricsContext) {
+    return 
ImmutableCommitMetrics.builder().metricsContext(metricsContext).build();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
new file mode 100644
index 0000000000..1cb23b174a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.Map;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.immutables.value.Value;
+
+/** A serializable version of {@link CommitMetrics} that carries its results. 
*/
[email protected]
+public interface CommitMetricsResult {
+  String ADDED_DATA_FILES = "added-data-files";
+  String REMOVED_DATA_FILES = "removed-data-files";
+  String TOTAL_DATA_FILES = "total-data-files";
+  String ADDED_DELETE_FILES = "added-delete-files";
+  String ADDED_EQ_DELETE_FILES = "added-equality-delete-files";
+  String ADDED_POS_DELETE_FILES = "added-positional-delete-files";
+  String REMOVED_POS_DELETE_FILES = "removed-positional-delete-files";
+  String REMOVED_EQ_DELETE_FILES = "removed-equality-delete-files";
+  String REMOVED_DELETE_FILES = "removed-delete-files";
+  String TOTAL_DELETE_FILES = "total-delete-files";
+  String ADDED_RECORDS = "added-records";
+  String REMOVED_RECORDS = "removed-records";
+  String TOTAL_RECORDS = "total-records";
+  String ADDED_FILE_SIZE_BYTES = "added-files-size-bytes";
+  String REMOVED_FILE_SIZE_BYTES = "removed-files-size-bytes";
+  String TOTAL_FILE_SIZE_BYTES = "total-files-size-bytes";
+  String ADDED_POS_DELETES = "added-position-deletes";
+  String REMOVED_POS_DELETES = "removed-positional-deletes";
+  String TOTAL_POS_DELETES = "total-positional-deletes";
+  String ADDED_EQ_DELETES = "added-equality-deletes";
+  String REMOVED_EQ_DELETES = "removed-equality-deletes";
+  String TOTAL_EQ_DELETES = "total-equality-deletes";
+
+  @Nullable
+  TimerResult totalDuration();
+
+  @Nullable
+  CounterResult attempts();
+
+  @Nullable
+  CounterResult addedDataFiles();
+
+  @Nullable
+  CounterResult removedDataFiles();
+
+  @Nullable
+  CounterResult totalDataFiles();
+
+  @Nullable
+  CounterResult addedDeleteFiles();
+
+  @Nullable
+  CounterResult addedEqualityDeleteFiles();
+
+  @Nullable
+  CounterResult addedPositionalDeleteFiles();
+
+  @Nullable
+  CounterResult removedDeleteFiles();
+
+  @Nullable
+  CounterResult removedEqualityDeleteFiles();
+
+  @Nullable
+  CounterResult removedPositionalDeleteFiles();
+
+  @Nullable
+  CounterResult totalDeleteFiles();
+
+  @Nullable
+  CounterResult addedRecords();
+
+  @Nullable
+  CounterResult removedRecords();
+
+  @Nullable
+  CounterResult totalRecords();
+
+  @Nullable
+  CounterResult addedFilesSizeInBytes();
+
+  @Nullable
+  CounterResult removedFilesSizeInBytes();
+
+  @Nullable
+  CounterResult totalFilesSizeInBytes();
+
+  @Nullable
+  CounterResult addedPositionalDeletes();
+
+  @Nullable
+  CounterResult removedPositionalDeletes();
+
+  @Nullable
+  CounterResult totalPositionalDeletes();
+
+  @Nullable
+  CounterResult addedEqualityDeletes();
+
+  @Nullable
+  CounterResult removedEqualityDeletes();
+
+  @Nullable
+  CounterResult totalEqualityDeletes();
+
+  static CommitMetricsResult from(
+      CommitMetrics commitMetrics, Map<String, String> snapshotSummary) {
+    Preconditions.checkArgument(null != commitMetrics, "Invalid commit 
metrics: null");
+    Preconditions.checkArgument(null != snapshotSummary, "Invalid snapshot 
summary: null");
+    return ImmutableCommitMetricsResult.builder()
+        .attempts(CounterResult.fromCounter(commitMetrics.attempts()))
+        .totalDuration(TimerResult.fromTimer(commitMetrics.totalDuration()))
+        .addedDataFiles(counterFrom(snapshotSummary, 
SnapshotSummary.ADDED_FILES_PROP))
+        .removedDataFiles(counterFrom(snapshotSummary, 
SnapshotSummary.DELETED_FILES_PROP))
+        .totalDataFiles(counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_DATA_FILES_PROP))
+        .addedDeleteFiles(counterFrom(snapshotSummary, 
SnapshotSummary.ADDED_DELETE_FILES_PROP))
+        .addedPositionalDeleteFiles(
+            counterFrom(snapshotSummary, 
SnapshotSummary.ADD_POS_DELETE_FILES_PROP))
+        .addedEqualityDeleteFiles(
+            counterFrom(snapshotSummary, 
SnapshotSummary.ADD_EQ_DELETE_FILES_PROP))
+        .removedDeleteFiles(counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_DELETE_FILES_PROP))
+        .removedEqualityDeleteFiles(
+            counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP))
+        .removedPositionalDeleteFiles(
+            counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP))
+        .totalDeleteFiles(counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_DELETE_FILES_PROP))
+        .addedRecords(counterFrom(snapshotSummary, 
SnapshotSummary.ADDED_RECORDS_PROP))
+        .removedRecords(counterFrom(snapshotSummary, 
SnapshotSummary.DELETED_RECORDS_PROP))
+        .totalRecords(counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_RECORDS_PROP))
+        .addedFilesSizeInBytes(
+            counterFrom(snapshotSummary, SnapshotSummary.ADDED_FILE_SIZE_PROP, 
Unit.BYTES))
+        .removedFilesSizeInBytes(
+            counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_FILE_SIZE_PROP, Unit.BYTES))
+        .totalFilesSizeInBytes(
+            counterFrom(snapshotSummary, SnapshotSummary.TOTAL_FILE_SIZE_PROP, 
Unit.BYTES))
+        .addedPositionalDeletes(
+            counterFrom(snapshotSummary, 
SnapshotSummary.ADDED_POS_DELETES_PROP))
+        .removedPositionalDeletes(
+            counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_POS_DELETES_PROP))
+        .totalPositionalDeletes(
+            counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_POS_DELETES_PROP))
+        .addedEqualityDeletes(counterFrom(snapshotSummary, 
SnapshotSummary.ADDED_EQ_DELETES_PROP))
+        .removedEqualityDeletes(
+            counterFrom(snapshotSummary, 
SnapshotSummary.REMOVED_EQ_DELETES_PROP))
+        .totalEqualityDeletes(counterFrom(snapshotSummary, 
SnapshotSummary.TOTAL_EQ_DELETES_PROP))
+        .build();
+  }
+
+  static CounterResult counterFrom(Map<String, String> snapshotSummary, String 
metricName) {
+    return counterFrom(snapshotSummary, metricName, Unit.COUNT);
+  }
+
+  static CounterResult counterFrom(
+      Map<String, String> snapshotSummary, String metricName, Unit unit) {
+    if (!snapshotSummary.containsKey(metricName)) {
+      return null;
+    }
+
+    try {
+      return CounterResult.of(unit, 
Long.parseLong(snapshotSummary.get(metricName)));
+    } catch (NumberFormatException e) {
+      return null;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
new file mode 100644
index 0000000000..d4fd883c43
--- /dev/null
+++ 
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class CommitMetricsResultParser {
+  private CommitMetricsResultParser() {}
+
+  static String toJson(CommitMetricsResult metrics) {
+    return toJson(metrics, false);
+  }
+
+  static String toJson(CommitMetricsResult metrics, boolean pretty) {
+    return JsonUtil.generate(gen -> toJson(metrics, gen), pretty);
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws 
IOException {
+    Preconditions.checkArgument(null != metrics, "Invalid commit metrics: 
null");
+
+    gen.writeStartObject();
+
+    if (null != metrics.totalDuration()) {
+      gen.writeFieldName(CommitMetrics.TOTAL_DURATION);
+      TimerResultParser.toJson(metrics.totalDuration(), gen);
+    }
+
+    if (null != metrics.attempts()) {
+      gen.writeFieldName(CommitMetrics.ATTEMPTS);
+      CounterResultParser.toJson(metrics.attempts(), gen);
+    }
+
+    if (null != metrics.addedDataFiles()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_DATA_FILES);
+      CounterResultParser.toJson(metrics.addedDataFiles(), gen);
+    }
+
+    if (null != metrics.removedDataFiles()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_DATA_FILES);
+      CounterResultParser.toJson(metrics.removedDataFiles(), gen);
+    }
+
+    if (null != metrics.totalDataFiles()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_DATA_FILES);
+      CounterResultParser.toJson(metrics.totalDataFiles(), gen);
+    }
+
+    if (null != metrics.addedDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_DELETE_FILES);
+      CounterResultParser.toJson(metrics.addedDeleteFiles(), gen);
+    }
+
+    if (null != metrics.addedEqualityDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_EQ_DELETE_FILES);
+      CounterResultParser.toJson(metrics.addedEqualityDeleteFiles(), gen);
+    }
+
+    if (null != metrics.addedPositionalDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_POS_DELETE_FILES);
+      CounterResultParser.toJson(metrics.addedPositionalDeleteFiles(), gen);
+    }
+
+    if (null != metrics.removedDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_DELETE_FILES);
+      CounterResultParser.toJson(metrics.removedDeleteFiles(), gen);
+    }
+
+    if (null != metrics.removedPositionalDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_POS_DELETE_FILES);
+      CounterResultParser.toJson(metrics.removedPositionalDeleteFiles(), gen);
+    }
+
+    if (null != metrics.removedEqualityDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_EQ_DELETE_FILES);
+      CounterResultParser.toJson(metrics.removedEqualityDeleteFiles(), gen);
+    }
+
+    if (null != metrics.totalDeleteFiles()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_DELETE_FILES);
+      CounterResultParser.toJson(metrics.totalDeleteFiles(), gen);
+    }
+
+    if (null != metrics.addedRecords()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_RECORDS);
+      CounterResultParser.toJson(metrics.addedRecords(), gen);
+    }
+
+    if (null != metrics.removedRecords()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_RECORDS);
+      CounterResultParser.toJson(metrics.removedRecords(), gen);
+    }
+
+    if (null != metrics.totalRecords()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_RECORDS);
+      CounterResultParser.toJson(metrics.totalRecords(), gen);
+    }
+
+    if (null != metrics.addedFilesSizeInBytes()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_FILE_SIZE_BYTES);
+      CounterResultParser.toJson(metrics.addedFilesSizeInBytes(), gen);
+    }
+
+    if (null != metrics.removedFilesSizeInBytes()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_FILE_SIZE_BYTES);
+      CounterResultParser.toJson(metrics.removedFilesSizeInBytes(), gen);
+    }
+
+    if (null != metrics.totalFilesSizeInBytes()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_FILE_SIZE_BYTES);
+      CounterResultParser.toJson(metrics.totalFilesSizeInBytes(), gen);
+    }
+
+    if (null != metrics.addedPositionalDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_POS_DELETES);
+      CounterResultParser.toJson(metrics.addedPositionalDeletes(), gen);
+    }
+
+    if (null != metrics.removedPositionalDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_POS_DELETES);
+      CounterResultParser.toJson(metrics.removedPositionalDeletes(), gen);
+    }
+
+    if (null != metrics.totalPositionalDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_POS_DELETES);
+      CounterResultParser.toJson(metrics.totalPositionalDeletes(), gen);
+    }
+
+    if (null != metrics.addedEqualityDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.ADDED_EQ_DELETES);
+      CounterResultParser.toJson(metrics.addedEqualityDeletes(), gen);
+    }
+
+    if (null != metrics.removedEqualityDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.REMOVED_EQ_DELETES);
+      CounterResultParser.toJson(metrics.removedEqualityDeletes(), gen);
+    }
+
+    if (null != metrics.totalEqualityDeletes()) {
+      gen.writeFieldName(CommitMetricsResult.TOTAL_EQ_DELETES);
+      CounterResultParser.toJson(metrics.totalEqualityDeletes(), gen);
+    }
+
+    gen.writeEndObject();
+  }
+
+  static CommitMetricsResult fromJson(String json) {
+    return JsonUtil.parse(json, CommitMetricsResultParser::fromJson);
+  }
+
+  static CommitMetricsResult fromJson(JsonNode json) {
+    Preconditions.checkArgument(null != json, "Cannot parse commit metrics 
from null object");
+    Preconditions.checkArgument(
+        json.isObject(), "Cannot parse commit metrics from non-object: %s", 
json);
+
+    return ImmutableCommitMetricsResult.builder()
+        .attempts(CounterResultParser.fromJson(CommitMetrics.ATTEMPTS, json))
+        
.totalDuration(TimerResultParser.fromJson(CommitMetrics.TOTAL_DURATION, json))
+        
.addedDataFiles(CounterResultParser.fromJson(CommitMetricsResult.ADDED_DATA_FILES,
 json))
+        .removedDataFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DATA_FILES, json))
+        
.totalDataFiles(CounterResultParser.fromJson(CommitMetricsResult.TOTAL_DATA_FILES,
 json))
+        .addedDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.ADDED_DELETE_FILES, json))
+        .addedEqualityDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.ADDED_EQ_DELETE_FILES, json))
+        .addedPositionalDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.ADDED_POS_DELETE_FILES, json))
+        .removedEqualityDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETE_FILES, json))
+        .removedPositionalDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_POS_DELETE_FILES, 
json))
+        .removedDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DELETE_FILES, json))
+        .totalDeleteFiles(
+            
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_DELETE_FILES, json))
+        
.addedRecords(CounterResultParser.fromJson(CommitMetricsResult.ADDED_RECORDS, 
json))
+        
.removedRecords(CounterResultParser.fromJson(CommitMetricsResult.REMOVED_RECORDS,
 json))
+        
.totalRecords(CounterResultParser.fromJson(CommitMetricsResult.TOTAL_RECORDS, 
json))
+        .addedFilesSizeInBytes(
+            
CounterResultParser.fromJson(CommitMetricsResult.ADDED_FILE_SIZE_BYTES, json))
+        .removedFilesSizeInBytes(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_FILE_SIZE_BYTES, json))
+        .totalFilesSizeInBytes(
+            
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_FILE_SIZE_BYTES, json))
+        .addedPositionalDeletes(
+            
CounterResultParser.fromJson(CommitMetricsResult.ADDED_POS_DELETES, json))
+        .removedPositionalDeletes(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_POS_DELETES, json))
+        .totalPositionalDeletes(
+            
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_POS_DELETES, json))
+        .addedEqualityDeletes(
+            CounterResultParser.fromJson(CommitMetricsResult.ADDED_EQ_DELETES, 
json))
+        .removedEqualityDeletes(
+            
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETES, json))
+        .totalEqualityDeletes(
+            CounterResultParser.fromJson(CommitMetricsResult.TOTAL_EQ_DELETES, 
json))
+        .build();
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
similarity index 66%
copy from api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
copy to core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
index 45e3d16bff..bbe032eb27 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
@@ -18,15 +18,22 @@
  */
 package org.apache.iceberg.metrics;
 
-/** This interface defines the basic API for reporting metrics for operations 
to a Table. */
-@FunctionalInterface
-public interface MetricsReporter {
-
-  /**
-   * Indicates that a operation is done by reporting a {@link MetricsReport}. 
A {@link
-   * MetricsReport} is usually directly derived from a {@link MetricsReport} 
instance.
-   *
-   * @param report The {@link MetricsReport} to report.
-   */
-  void report(MetricsReport report);
+import java.util.Map;
+import org.immutables.value.Value;
+
+/** A commit report that contains all relevant information from a Snapshot. */
[email protected]
+public interface CommitReport extends MetricsReport {
+
+  String tableName();
+
+  long snapshotId();
+
+  long sequenceNumber();
+
+  String operation();
+
+  CommitMetricsResult commitMetrics();
+
+  Map<String, String> metadata();
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java 
b/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java
new file mode 100644
index 0000000000..b6b3f71250
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class CommitReportParser {
+  private static final String TABLE_NAME = "table-name";
+  private static final String SNAPSHOT_ID = "snapshot-id";
+  private static final String SEQUENCE_NUMBER = "sequence-number";
+  private static final String OPERATION = "operation";
+  private static final String METRICS = "metrics";
+  private static final String METADATA = "metadata";
+
+  private CommitReportParser() {}
+
+  public static String toJson(CommitReport commitReport) {
+    return toJson(commitReport, false);
+  }
+
+  public static String toJson(CommitReport commitReport, boolean pretty) {
+    return JsonUtil.generate(gen -> toJson(commitReport, gen), pretty);
+  }
+
+  public static void toJson(CommitReport commitReport, JsonGenerator gen) 
throws IOException {
+    Preconditions.checkArgument(null != commitReport, "Invalid commit report: 
null");
+
+    gen.writeStartObject();
+    toJsonWithoutStartEnd(commitReport, gen);
+    gen.writeEndObject();
+  }
+
+  /**
+   * This serializes the {@link CommitReport} without writing a start/end 
object and is mainly used
+   * by {@link org.apache.iceberg.rest.requests.ReportMetricsRequestParser}.
+   *
+   * @param commitReport The {@link CommitReport} to serialize
+   * @param gen The {@link JsonGenerator} to use
+   * @throws IOException If an error occurs while serializing
+   */
+  public static void toJsonWithoutStartEnd(CommitReport commitReport, 
JsonGenerator gen)
+      throws IOException {
+    Preconditions.checkArgument(null != commitReport, "Invalid commit report: 
null");
+
+    gen.writeStringField(TABLE_NAME, commitReport.tableName());
+    gen.writeNumberField(SNAPSHOT_ID, commitReport.snapshotId());
+    gen.writeNumberField(SEQUENCE_NUMBER, commitReport.sequenceNumber());
+    gen.writeStringField(OPERATION, commitReport.operation());
+
+    gen.writeFieldName(METRICS);
+    CommitMetricsResultParser.toJson(commitReport.commitMetrics(), gen);
+
+    if (!commitReport.metadata().isEmpty()) {
+      JsonUtil.writeStringMap(METADATA, commitReport.metadata(), gen);
+    }
+  }
+
+  public static CommitReport fromJson(String json) {
+    return JsonUtil.parse(json, CommitReportParser::fromJson);
+  }
+
+  public static CommitReport fromJson(JsonNode json) {
+    Preconditions.checkArgument(null != json, "Cannot parse commit report from 
null object");
+    Preconditions.checkArgument(
+        json.isObject(), "Cannot parse commit report from non-object: %s", 
json);
+
+    ImmutableCommitReport.Builder builder =
+        ImmutableCommitReport.builder()
+            .tableName(JsonUtil.getString(TABLE_NAME, json))
+            .snapshotId(JsonUtil.getLong(SNAPSHOT_ID, json))
+            .sequenceNumber(JsonUtil.getLong(SEQUENCE_NUMBER, json))
+            .operation(JsonUtil.getString(OPERATION, json))
+            
.commitMetrics(CommitMetricsResultParser.fromJson(JsonUtil.get(METRICS, json)));
+
+    if (json.has(METADATA)) {
+      builder.metadata(JsonUtil.getStringMap(METADATA, json));
+    }
+
+    return builder.build();
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java 
b/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
index 32b43af395..cdbe19fad2 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ExpressionParser;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -83,11 +82,7 @@ public class ScanReportParser {
     ScanMetricsResultParser.toJson(scanReport.scanMetrics(), gen);
 
     if (!scanReport.metadata().isEmpty()) {
-      gen.writeObjectFieldStart(METADATA);
-      for (Map.Entry<String, String> entry : scanReport.metadata().entrySet()) 
{
-        gen.writeStringField(entry.getKey(), entry.getValue());
-      }
-      gen.writeEndObject();
+      JsonUtil.writeStringMap(METADATA, scanReport.metadata(), gen);
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index d4757c4ed5..d52dd372c9 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -601,7 +601,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
               createChanges(meta),
               meta);
 
-      return Transactions.createTableTransaction(fullName, ops, meta);
+      return Transactions.createTableTransaction(
+          fullName, ops, meta, report -> reportMetrics(ident, report, 
session::headers));
     }
 
     @Override
@@ -651,7 +652,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
               changes.build(),
               base);
 
-      return Transactions.replaceTableTransaction(fullName, ops, replacement);
+      return Transactions.replaceTableTransaction(
+          fullName, ops, replacement, report -> reportMetrics(ident, report, 
session::headers));
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java 
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
index 963cb1a4f0..c85a6e9add 100644
--- 
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
+++ 
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.rest.requests;
 
 import java.util.Locale;
+import org.apache.iceberg.metrics.CommitReport;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -29,7 +30,8 @@ import org.immutables.value.Value;
 public interface ReportMetricsRequest extends RESTRequest {
 
   enum ReportType {
-    SCAN_REPORT;
+    SCAN_REPORT,
+    COMMIT_REPORT;
 
     static ReportType fromString(String reportType) {
       Preconditions.checkArgument(null != reportType, "Invalid report type: 
null");
@@ -52,7 +54,13 @@ public interface ReportMetricsRequest extends RESTRequest {
   }
 
   static ReportMetricsRequest of(MetricsReport report) {
-    ReportType reportType = report instanceof ScanReport ? 
ReportType.SCAN_REPORT : null;
+    ReportType reportType = null;
+    if (report instanceof ScanReport) {
+      reportType = ReportType.SCAN_REPORT;
+    } else if (report instanceof CommitReport) {
+      reportType = ReportType.COMMIT_REPORT;
+    }
+
     Preconditions.checkArgument(
         null != reportType, "Unsupported report type: %s", 
report.getClass().getName());
 
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
 
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
index f180da09cd..e6d98cdac3 100644
--- 
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
+++ 
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.Locale;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
 import org.apache.iceberg.metrics.ScanReport;
 import org.apache.iceberg.metrics.ScanReportParser;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,6 +55,10 @@ public class ReportMetricsRequestParser {
       ScanReportParser.toJsonWithoutStartEnd((ScanReport) request.report(), 
gen);
     }
 
+    if (ReportType.COMMIT_REPORT == request.reportType()) {
+      CommitReportParser.toJsonWithoutStartEnd((CommitReport) 
request.report(), gen);
+    }
+
     gen.writeEndObject();
   }
 
@@ -81,6 +87,13 @@ public class ReportMetricsRequestParser {
           .build();
     }
 
+    if (ReportType.COMMIT_REPORT == type) {
+      return ImmutableReportMetricsRequest.builder()
+          .reportType(type)
+          .report(CommitReportParser.fromJson(json))
+          .build();
+    }
+
     throw new IllegalArgumentException(String.format("Cannot build metrics 
request from %s", json));
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java 
b/core/src/main/java/org/apache/iceberg/util/Tasks.java
index dfd4ab9984..e420145c8d 100644
--- a/core/src/main/java/org/apache/iceberg/util/Tasks.java
+++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
+import org.apache.iceberg.metrics.Counter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +87,7 @@ public class Tasks {
     private long maxSleepTimeMs = 600000; // 10 minutes
     private long maxDurationMs = 600000; // 10 minutes
     private double scaleFactor = 2.0; // exponential
+    private Counter attemptsCounter;
 
     public Builder(Iterable<I> items) {
       this.items = items;
@@ -173,6 +175,11 @@ public class Tasks {
       return this;
     }
 
+    public Builder<I> countAttempts(Counter counter) {
+      this.attemptsCounter = counter;
+      return this;
+    }
+
     public Builder<I> exponentialBackoff(
         long backoffMinSleepTimeMs,
         long backoffMaxSleepTimeMs,
@@ -398,6 +405,10 @@ public class Tasks {
       int attempt = 0;
       while (true) {
         attempt += 1;
+        if (null != attemptsCounter) {
+          attemptsCounter.increment();
+        }
+
         try {
           task.run(item);
           break;
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java 
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
new file mode 100644
index 0000000000..9998c47ff3
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.iceberg.TestScanPlanningAndReporting.TestMetricsReporter;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+public class TestCommitReporting extends TableTestBase {
+
+  private final TestMetricsReporter reporter = new TestMetricsReporter();
+
+  public TestCommitReporting() {
+    super(2);
+  }
+
+  @Test
+  public void addAndDeleteDataFiles() {
+    String tableName = "add-and-delete-data-files";
+    Table table =
+        TestTables.create(
+            tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
+    table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+    CommitReport report = reporter.lastCommitReport();
+    assertThat(report).isNotNull();
+    assertThat(report.operation()).isEqualTo("append");
+    assertThat(report.snapshotId()).isEqualTo(1L);
+    assertThat(report.sequenceNumber()).isEqualTo(1L);
+    assertThat(report.tableName()).isEqualTo(tableName);
+
+    CommitMetricsResult metrics = report.commitMetrics();
+    assertThat(metrics.addedDataFiles().value()).isEqualTo(2L);
+    assertThat(metrics.totalDataFiles().value()).isEqualTo(2L);
+
+    assertThat(metrics.addedRecords().value()).isEqualTo(2L);
+    assertThat(metrics.totalRecords().value()).isEqualTo(2L);
+
+    assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(20L);
+    assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(20L);
+
+    // now remove those 2 data files
+    table.newDelete().deleteFile(FILE_A).deleteFile(FILE_D).commit();
+    report = reporter.lastCommitReport();
+    assertThat(report).isNotNull();
+    assertThat(report.operation()).isEqualTo("delete");
+    assertThat(report.snapshotId()).isEqualTo(2L);
+    assertThat(report.sequenceNumber()).isEqualTo(2L);
+    assertThat(report.tableName()).isEqualTo(tableName);
+
+    metrics = report.commitMetrics();
+    assertThat(metrics.removedDataFiles().value()).isEqualTo(2L);
+    assertThat(metrics.totalDeleteFiles().value()).isEqualTo(0L);
+
+    assertThat(metrics.removedRecords().value()).isEqualTo(2L);
+    assertThat(metrics.totalRecords().value()).isEqualTo(0L);
+
+    assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(20L);
+    assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);
+  }
+
+  @Test
+  public void addAndDeleteDeleteFiles() {
+    String tableName = "add-and-delete-delete-files";
+    Table table =
+        TestTables.create(
+            tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
+
+    // 2 positional + 1 equality
+    table
+        .newRowDelta()
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_B_DELETES)
+        .addDeletes(FILE_C2_DELETES)
+        .commit();
+
+    CommitReport report = reporter.lastCommitReport();
+    assertThat(report).isNotNull();
+    assertThat(report.operation()).isEqualTo("overwrite");
+    assertThat(report.snapshotId()).isEqualTo(1L);
+    assertThat(report.sequenceNumber()).isEqualTo(1L);
+    assertThat(report.tableName()).isEqualTo(tableName);
+
+    CommitMetricsResult metrics = report.commitMetrics();
+    assertThat(metrics.addedDeleteFiles().value()).isEqualTo(3L);
+    assertThat(metrics.totalDeleteFiles().value()).isEqualTo(3L);
+    assertThat(metrics.addedPositionalDeleteFiles().value()).isEqualTo(2L);
+    assertThat(metrics.addedEqualityDeleteFiles().value()).isEqualTo(1L);
+
+    assertThat(metrics.addedPositionalDeletes().value()).isEqualTo(2L);
+    assertThat(metrics.totalPositionalDeletes().value()).isEqualTo(2L);
+
+    assertThat(metrics.addedEqualityDeletes().value()).isEqualTo(1L);
+    assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(1L);
+
+    assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(30L);
+    assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(30L);
+
+    // now remove those 2 positional + 1 equality delete files
+    table
+        .newRewrite()
+        .rewriteFiles(
+            ImmutableSet.of(),
+            ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES, FILE_C2_DELETES),
+            ImmutableSet.of(),
+            ImmutableSet.of())
+        .commit();
+
+    report = reporter.lastCommitReport();
+    assertThat(report).isNotNull();
+    assertThat(report.operation()).isEqualTo("replace");
+    assertThat(report.snapshotId()).isEqualTo(2L);
+    assertThat(report.sequenceNumber()).isEqualTo(2L);
+    assertThat(report.tableName()).isEqualTo(tableName);
+
+    metrics = report.commitMetrics();
+    assertThat(metrics.removedDeleteFiles().value()).isEqualTo(3L);
+    assertThat(metrics.totalDeleteFiles().value()).isEqualTo(0L);
+    assertThat(metrics.removedPositionalDeleteFiles().value()).isEqualTo(2L);
+    assertThat(metrics.removedEqualityDeleteFiles().value()).isEqualTo(1L);
+
+    assertThat(metrics.removedPositionalDeletes().value()).isEqualTo(2L);
+    assertThat(metrics.totalPositionalDeletes().value()).isEqualTo(0L);
+
+    assertThat(metrics.removedEqualityDeletes().value()).isEqualTo(1L);
+    assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(0L);
+
+    assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(30L);
+    assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);
+  }
+
+  @Test
+  public void addAndDeleteManifests() throws IOException {
+    String tableName = "add-and-delete-manifests";
+    Table table =
+        TestTables.create(
+            tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
+
+    table.newAppend().appendFile(FILE_A).commit();
+    Snapshot snap1 = table.currentSnapshot();
+    table.newAppend().appendFile(FILE_B).commit();
+    Snapshot snap2 = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), 
FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), 
FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest).commit();
+
+    CommitReport report = reporter.lastCommitReport();
+    assertThat(report).isNotNull();
+    assertThat(report.operation()).isEqualTo("append");
+    assertThat(report.snapshotId()).isEqualTo(2L);
+    assertThat(report.sequenceNumber()).isEqualTo(2L);
+    assertThat(report.tableName()).isEqualTo(tableName);
+
+    CommitMetricsResult metrics = report.commitMetrics();
+    assertThat(metrics.addedDataFiles().value()).isEqualTo(1L);
+    assertThat(metrics.addedRecords().value()).isEqualTo(1L);
+    assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(10L);
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java 
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index e39ed13cac..6284e08d45 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import java.util.List;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.CommitReport;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.apache.iceberg.metrics.MetricsReporter;
@@ -245,10 +246,10 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     assertThat(result.positionalDeleteFiles().value()).isEqualTo(1);
   }
 
-  private static class TestMetricsReporter implements MetricsReporter {
+  static class TestMetricsReporter implements MetricsReporter {
     private final List<MetricsReport> reports = Lists.newArrayList();
     // this is mainly so that we see scan reports being logged during tests
-    private final LoggingMetricsReporter delegate = new 
LoggingMetricsReporter();
+    private final LoggingMetricsReporter delegate = 
LoggingMetricsReporter.instance();
 
     @Override
     public void report(MetricsReport report) {
@@ -260,7 +261,16 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
       if (reports.isEmpty()) {
         return null;
       }
+
       return (ScanReport) reports.get(reports.size() - 1);
     }
+
+    public CommitReport lastCommitReport() {
+      if (reports.isEmpty()) {
+        return null;
+      }
+
+      return (CommitReport) reports.get(reports.size() - 1);
+    }
   }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
 
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
new file mode 100644
index 0000000000..5ba3aecbdc
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public class TestCommitMetricsResultParser {
+
+  @Test
+  public void nullMetrics() {
+    assertThatThrownBy(() -> CommitMetricsResultParser.fromJson((JsonNode) 
null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse commit metrics from null object");
+
+    assertThatThrownBy(() -> CommitMetricsResultParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid commit metrics: null");
+
+    assertThatThrownBy(() -> CommitMetricsResult.from(null, ImmutableMap.of()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid commit metrics: null");
+
+    assertThatThrownBy(
+            () -> CommitMetricsResult.from(CommitMetrics.of(new 
DefaultMetricsContext()), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid snapshot summary: null");
+  }
+
+  @Test
+  public void invalidNumberInSnapshotSummary() {
+    CommitMetricsResult result =
+        CommitMetricsResult.from(
+            CommitMetrics.of(new DefaultMetricsContext()),
+            ImmutableMap.of(SnapshotSummary.ADDED_FILES_PROP, "xyz"));
+    assertThat(result.addedDataFiles()).isNull();
+  }
+
+  @SuppressWarnings("MethodLength")
+  @Test
+  public void roundTripSerde() {
+    CommitMetrics commitMetrics = CommitMetrics.of(new 
DefaultMetricsContext());
+    commitMetrics.totalDuration().record(100, TimeUnit.SECONDS);
+    commitMetrics.attempts().increment(4);
+    Map<String, String> snapshotSummary =
+        ImmutableMap.<String, String>builder()
+            .put(SnapshotSummary.ADDED_FILES_PROP, "1")
+            .put(SnapshotSummary.DELETED_FILES_PROP, "2")
+            .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+            .put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
+            .put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
+            .put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
+            .put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
+            .put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
+            .put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
+            .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "10")
+            .put(SnapshotSummary.ADDED_RECORDS_PROP, "11")
+            .put(SnapshotSummary.DELETED_RECORDS_PROP, "12")
+            .put(SnapshotSummary.TOTAL_RECORDS_PROP, "13")
+            .put(SnapshotSummary.ADDED_FILE_SIZE_PROP, "14")
+            .put(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "15")
+            .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "16")
+            .put(SnapshotSummary.ADDED_POS_DELETES_PROP, "17")
+            .put(SnapshotSummary.ADDED_EQ_DELETES_PROP, "18")
+            .put(SnapshotSummary.REMOVED_POS_DELETES_PROP, "19")
+            .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
+            .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
+            .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+            .build();
+
+    CommitMetricsResult result = CommitMetricsResult.from(commitMetrics, 
snapshotSummary);
+    assertThat(result.attempts().value()).isEqualTo(4L);
+    
assertThat(result.totalDuration().totalDuration()).isEqualTo(Duration.ofSeconds(100));
+    assertThat(result.addedDataFiles().value()).isEqualTo(1L);
+    assertThat(result.removedDataFiles().value()).isEqualTo(2L);
+    assertThat(result.totalDataFiles().value()).isEqualTo(3L);
+    assertThat(result.addedDeleteFiles().value()).isEqualTo(4L);
+    assertThat(result.addedEqualityDeleteFiles().value()).isEqualTo(5L);
+    assertThat(result.addedPositionalDeleteFiles().value()).isEqualTo(6L);
+    assertThat(result.removedPositionalDeleteFiles().value()).isEqualTo(7L);
+    assertThat(result.removedEqualityDeleteFiles().value()).isEqualTo(8L);
+    assertThat(result.removedDeleteFiles().value()).isEqualTo(9L);
+    assertThat(result.totalDeleteFiles().value()).isEqualTo(10L);
+    assertThat(result.addedRecords().value()).isEqualTo(11L);
+    assertThat(result.removedRecords().value()).isEqualTo(12L);
+    assertThat(result.totalRecords().value()).isEqualTo(13L);
+    assertThat(result.addedFilesSizeInBytes().value()).isEqualTo(14L);
+    assertThat(result.removedFilesSizeInBytes().value()).isEqualTo(15L);
+    assertThat(result.totalFilesSizeInBytes().value()).isEqualTo(16L);
+    assertThat(result.addedPositionalDeletes().value()).isEqualTo(17L);
+    assertThat(result.addedEqualityDeletes().value()).isEqualTo(18L);
+    assertThat(result.removedPositionalDeletes().value()).isEqualTo(19L);
+    assertThat(result.removedEqualityDeletes().value()).isEqualTo(20L);
+    assertThat(result.totalPositionalDeletes().value()).isEqualTo(21L);
+    assertThat(result.totalEqualityDeletes().value()).isEqualTo(22L);
+
+    String expectedJson =
+        "{\n"
+            + "  \"total-duration\" : {\n"
+            + "    \"count\" : 1,\n"
+            + "    \"time-unit\" : \"nanoseconds\",\n"
+            + "    \"total-duration\" : 100000000000\n"
+            + "  },\n"
+            + "  \"attempts\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 4\n"
+            + "  },\n"
+            + "  \"added-data-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 1\n"
+            + "  },\n"
+            + "  \"removed-data-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 2\n"
+            + "  },\n"
+            + "  \"total-data-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 3\n"
+            + "  },\n"
+            + "  \"added-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 4\n"
+            + "  },\n"
+            + "  \"added-equality-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 5\n"
+            + "  },\n"
+            + "  \"added-positional-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 6\n"
+            + "  },\n"
+            + "  \"removed-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 9\n"
+            + "  },\n"
+            + "  \"removed-positional-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 7\n"
+            + "  },\n"
+            + "  \"removed-equality-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 8\n"
+            + "  },\n"
+            + "  \"total-delete-files\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 10\n"
+            + "  },\n"
+            + "  \"added-records\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 11\n"
+            + "  },\n"
+            + "  \"removed-records\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 12\n"
+            + "  },\n"
+            + "  \"total-records\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 13\n"
+            + "  },\n"
+            + "  \"added-files-size-bytes\" : {\n"
+            + "    \"unit\" : \"bytes\",\n"
+            + "    \"value\" : 14\n"
+            + "  },\n"
+            + "  \"removed-files-size-bytes\" : {\n"
+            + "    \"unit\" : \"bytes\",\n"
+            + "    \"value\" : 15\n"
+            + "  },\n"
+            + "  \"total-files-size-bytes\" : {\n"
+            + "    \"unit\" : \"bytes\",\n"
+            + "    \"value\" : 16\n"
+            + "  },\n"
+            + "  \"added-position-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 17\n"
+            + "  },\n"
+            + "  \"removed-positional-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 19\n"
+            + "  },\n"
+            + "  \"total-positional-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 21\n"
+            + "  },\n"
+            + "  \"added-equality-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 18\n"
+            + "  },\n"
+            + "  \"removed-equality-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 20\n"
+            + "  },\n"
+            + "  \"total-equality-deletes\" : {\n"
+            + "    \"unit\" : \"count\",\n"
+            + "    \"value\" : 22\n"
+            + "  }\n"
+            + "}";
+
+    String json = CommitMetricsResultParser.toJson(result, true);
+    assertThat(CommitMetricsResultParser.fromJson(json)).isEqualTo(result);
+    assertThat(json).isEqualTo(expectedJson);
+  }
+
+  @Test
+  public void roundTripSerdeNoopCommitMetrics() {
+    CommitMetricsResult commitMetricsResult =
+        CommitMetricsResult.from(CommitMetrics.noop(), 
SnapshotSummary.builder().build());
+    String expectedJson = "{ }";
+
+    String json = CommitMetricsResultParser.toJson(commitMetricsResult, true);
+    System.out.println(json);
+    assertThat(json).isEqualTo(expectedJson);
+    assertThat(CommitMetricsResultParser.fromJson(json))
+        .isEqualTo(ImmutableCommitMetricsResult.builder().build());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java 
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java
new file mode 100644
index 0000000000..6c2d9edd36
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestCommitReportParser {
+
+  @Test
+  public void nullCommitReport() {
+    Assertions.assertThatThrownBy(() -> CommitReportParser.fromJson((JsonNode) 
null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse commit report from null object");
+
+    Assertions.assertThatThrownBy(() -> CommitReportParser.toJson(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid commit report: null");
+  }
+
+  @Test
+  public void missingFields() {
+    Assertions.assertThatThrownBy(() -> CommitReportParser.fromJson("{}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing string: table-name");
+
+    Assertions.assertThatThrownBy(
+            () -> 
CommitReportParser.fromJson("{\"table-name\":\"roundTripTableName\"}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing long: snapshot-id");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                CommitReportParser.fromJson(
+                    
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing long: sequence-number");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                CommitReportParser.fromJson(
+                    
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23,\"sequence-number\":24}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing string: operation");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                CommitReportParser.fromJson(
+                    
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23,\"sequence-number\":24,
 \"operation\": \"DELETE\"}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing field: metrics");
+  }
+
+  @Test
+  public void invalidTableName() {
+    Assertions.assertThatThrownBy(() -> 
CommitReportParser.fromJson("{\"table-name\":23}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a string value: table-name: 23");
+  }
+
+  @Test
+  public void invalidSnapshotId() {
+    Assertions.assertThatThrownBy(
+            () ->
+                CommitReportParser.fromJson(
+                    
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":\"invalid\"}"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: snapshot-id: \"invalid\"");
+  }
+
+  @SuppressWarnings("MethodLength")
+  @Test
+  public void roundTripSerde() {
+    CommitMetrics commitMetrics = CommitMetrics.of(new 
DefaultMetricsContext());
+    commitMetrics.totalDuration().record(100, TimeUnit.SECONDS);
+    commitMetrics.attempts().increment(4);
+    Map<String, String> snapshotSummary =
+        com.google.common.collect.ImmutableMap.<String, String>builder()
+            .put(SnapshotSummary.ADDED_FILES_PROP, "1")
+            .put(SnapshotSummary.DELETED_FILES_PROP, "2")
+            .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+            .put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
+            .put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
+            .put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
+            .put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
+            .put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
+            .put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
+            .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "10")
+            .put(SnapshotSummary.ADDED_RECORDS_PROP, "11")
+            .put(SnapshotSummary.DELETED_RECORDS_PROP, "12")
+            .put(SnapshotSummary.TOTAL_RECORDS_PROP, "13")
+            .put(SnapshotSummary.ADDED_FILE_SIZE_PROP, "14")
+            .put(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "15")
+            .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "16")
+            .put(SnapshotSummary.ADDED_POS_DELETES_PROP, "17")
+            .put(SnapshotSummary.ADDED_EQ_DELETES_PROP, "18")
+            .put(SnapshotSummary.REMOVED_POS_DELETES_PROP, "19")
+            .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
+            .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
+            .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+            .build();
+
+    String tableName = "roundTripTableName";
+    CommitReport commitReport =
+        ImmutableCommitReport.builder()
+            .tableName(tableName)
+            .snapshotId(23L)
+            .operation("DELETE")
+            .sequenceNumber(4L)
+            .commitMetrics(CommitMetricsResult.from(commitMetrics, 
snapshotSummary))
+            .build();
+
+    String expectedJson =
+        "{\n"
+            + "  \"table-name\" : \"roundTripTableName\",\n"
+            + "  \"snapshot-id\" : 23,\n"
+            + "  \"sequence-number\" : 4,\n"
+            + "  \"operation\" : \"DELETE\",\n"
+            + "  \"metrics\" : {\n"
+            + "    \"total-duration\" : {\n"
+            + "      \"count\" : 1,\n"
+            + "      \"time-unit\" : \"nanoseconds\",\n"
+            + "      \"total-duration\" : 100000000000\n"
+            + "    },\n"
+            + "    \"attempts\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 4\n"
+            + "    },\n"
+            + "    \"added-data-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 1\n"
+            + "    },\n"
+            + "    \"removed-data-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 2\n"
+            + "    },\n"
+            + "    \"total-data-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 3\n"
+            + "    },\n"
+            + "    \"added-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 4\n"
+            + "    },\n"
+            + "    \"added-equality-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 5\n"
+            + "    },\n"
+            + "    \"added-positional-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 6\n"
+            + "    },\n"
+            + "    \"removed-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 9\n"
+            + "    },\n"
+            + "    \"removed-positional-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 7\n"
+            + "    },\n"
+            + "    \"removed-equality-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 8\n"
+            + "    },\n"
+            + "    \"total-delete-files\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 10\n"
+            + "    },\n"
+            + "    \"added-records\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 11\n"
+            + "    },\n"
+            + "    \"removed-records\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 12\n"
+            + "    },\n"
+            + "    \"total-records\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 13\n"
+            + "    },\n"
+            + "    \"added-files-size-bytes\" : {\n"
+            + "      \"unit\" : \"bytes\",\n"
+            + "      \"value\" : 14\n"
+            + "    },\n"
+            + "    \"removed-files-size-bytes\" : {\n"
+            + "      \"unit\" : \"bytes\",\n"
+            + "      \"value\" : 15\n"
+            + "    },\n"
+            + "    \"total-files-size-bytes\" : {\n"
+            + "      \"unit\" : \"bytes\",\n"
+            + "      \"value\" : 16\n"
+            + "    },\n"
+            + "    \"added-position-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 17\n"
+            + "    },\n"
+            + "    \"removed-positional-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 19\n"
+            + "    },\n"
+            + "    \"total-positional-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 21\n"
+            + "    },\n"
+            + "    \"added-equality-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 18\n"
+            + "    },\n"
+            + "    \"removed-equality-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 20\n"
+            + "    },\n"
+            + "    \"total-equality-deletes\" : {\n"
+            + "      \"unit\" : \"count\",\n"
+            + "      \"value\" : 22\n"
+            + "    }\n"
+            + "  }\n"
+            + "}";
+
+    String json = CommitReportParser.toJson(commitReport, true);
+    
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+    Assertions.assertThat(json).isEqualTo(expectedJson);
+  }
+
+  @Test
+  public void roundTripSerdeWithNoopMetrics() {
+    String tableName = "roundTripTableName";
+    CommitReport commitReport =
+        ImmutableCommitReport.builder()
+            .tableName(tableName)
+            .snapshotId(23L)
+            .operation("DELETE")
+            .sequenceNumber(4L)
+            .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(), 
ImmutableMap.of()))
+            .build();
+
+    String expectedJson =
+        "{\n"
+            + "  \"table-name\" : \"roundTripTableName\",\n"
+            + "  \"snapshot-id\" : 23,\n"
+            + "  \"sequence-number\" : 4,\n"
+            + "  \"operation\" : \"DELETE\",\n"
+            + "  \"metrics\" : { }\n"
+            + "}";
+
+    String json = CommitReportParser.toJson(commitReport, true);
+    
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+    Assertions.assertThat(json).isEqualTo(expectedJson);
+  }
+
+  @Test
+  public void roundTripSerdeWithMetadata() {
+    String tableName = "roundTripTableName";
+    CommitReport commitReport =
+        ImmutableCommitReport.builder()
+            .tableName(tableName)
+            .snapshotId(23L)
+            .operation("DELETE")
+            .sequenceNumber(4L)
+            .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(), 
ImmutableMap.of()))
+            .metadata(ImmutableMap.of("k1", "v1", "k2", "v2"))
+            .build();
+
+    String expectedJson =
+        "{\n"
+            + "  \"table-name\" : \"roundTripTableName\",\n"
+            + "  \"snapshot-id\" : 23,\n"
+            + "  \"sequence-number\" : 4,\n"
+            + "  \"operation\" : \"DELETE\",\n"
+            + "  \"metrics\" : { },\n"
+            + "  \"metadata\" : {\n"
+            + "    \"k1\" : \"v1\",\n"
+            + "    \"k2\" : \"v2\"\n"
+            + "  }\n"
+            + "}";
+
+    String json = CommitReportParser.toJson(commitReport, true);
+    
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+    Assertions.assertThat(json).isEqualTo(expectedJson);
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 2d1133e9c4..9f8306b35d 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -1121,7 +1121,8 @@ public class TestRESTCatalog extends 
CatalogTests<RESTCatalog> {
     }
 
     // counter of custom metrics reporter should have been increased
-    assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1);
+    // 1x for commit metrics / 1x for scan metrics
+    assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2);
   }
 
   public static class CustomMetricsReporter implements MetricsReporter {
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
 
b/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
index a97bdfe3bc..70a219e791 100644
--- 
a/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
+++ 
b/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
@@ -20,11 +20,16 @@ package org.apache.iceberg.rest.requests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
 import org.apache.iceberg.metrics.ImmutableScanReport;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.apache.iceberg.metrics.ScanMetrics;
 import org.apache.iceberg.metrics.ScanMetricsResult;
 import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
@@ -78,7 +83,7 @@ public class TestReportMetricsRequestParser {
   }
 
   @Test
-  public void roundTripSerde() {
+  public void roundTripSerdeWithScanReport() {
     String tableName = "roundTripTableName";
     ScanReport scanReport =
         ImmutableScanReport.builder()
@@ -111,4 +116,35 @@ public class TestReportMetricsRequestParser {
     Assertions.assertThat(ReportMetricsRequestParser.fromJson(json).report())
         .isEqualTo(metricsRequest.report());
   }
+
+  @Test
+  public void roundTripSerdeWithCommitReport() {
+    String tableName = "roundTripTableName";
+    CommitReport commitReport =
+        ImmutableCommitReport.builder()
+            .tableName(tableName)
+            .snapshotId(23L)
+            .sequenceNumber(4L)
+            .operation("DELETE")
+            .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(), 
ImmutableMap.of()))
+            .build();
+
+    String expectedJson =
+        "{\n"
+            + "  \"report-type\" : \"commit-report\",\n"
+            + "  \"table-name\" : \"roundTripTableName\",\n"
+            + "  \"snapshot-id\" : 23,\n"
+            + "  \"sequence-number\" : 4,\n"
+            + "  \"operation\" : \"DELETE\",\n"
+            + "  \"metrics\" : { }\n"
+            + "}";
+
+    ReportMetricsRequest metricsRequest = 
ReportMetricsRequest.of(commitReport);
+
+    String json = ReportMetricsRequestParser.toJson(metricsRequest, true);
+    Assertions.assertThat(json).isEqualTo(expectedJson);
+
+    Assertions.assertThat(ReportMetricsRequestParser.fromJson(json).report())
+        .isEqualTo(metricsRequest.report());
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java 
b/core/src/test/java/org/apache/iceberg/util/TestTasks.java
new file mode 100644
index 0000000000..f337a3500d
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.stream.IntStream;
+import org.apache.iceberg.metrics.Counter;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestTasks {
+
+  @Test
+  public void attemptCounterIsIncreasedOnRetries() {
+    Counter counter = new DefaultMetricsContext().counter("counter");
+
+    final int retries = 10;
+
+    Tasks.foreach(IntStream.range(0, 10))
+        .countAttempts(counter)
+        .exponentialBackoff(0, 0, 5000, 0)
+        .retry(retries)
+        .onlyRetryOn(RuntimeException.class)
+        .run(
+            x -> {
+              // don't throw on the last retry
+              if (counter.value() <= retries) {
+                throw new RuntimeException();
+              }
+            });
+
+    Assertions.assertThat(counter.value()).isEqualTo(retries + 1);
+  }
+
+  @Test
+  public void attemptCounterIsIncreasedWithoutRetries() {
+    Counter counter = new DefaultMetricsContext().counter("counter");
+
+    Tasks.foreach(IntStream.range(0, 10)).countAttempts(counter).run(x -> {});
+
+    Assertions.assertThat(counter.value()).isEqualTo(1L);
+  }
+}
diff --git a/open-api/rest-catalog-open-api.yaml 
b/open-api/rest-catalog-open-api.yaml
index 1d810f4a07..c8c3b09c21 100644
--- a/open-api/rest-catalog-open-api.yaml
+++ b/open-api/rest-catalog-open-api.yaml
@@ -1895,6 +1895,7 @@ components:
     ReportMetricsRequest:
       anyOf:
         - $ref: '#/components/schemas/ScanReport'
+        - $ref: '#/components/schemas/CommitReport'
       required:
         - report-type
       properties:
@@ -1931,6 +1932,36 @@ components:
             type: string
         metrics:
           $ref: '#/components/schemas/Metrics'
+        metadata:
+          type: object
+          additionalProperties:
+            type: string
+
+    CommitReport:
+      type: object
+      required:
+        - table-name
+        - snapshot-id
+        - sequence-number
+        - operation
+        - metrics
+      properties:
+        table-name:
+          type: string
+        snapshot-id:
+          type: integer
+          format: int64
+        sequence-number:
+          type: integer
+          format: int64
+        operation:
+          type: string
+        metrics:
+          $ref: '#/components/schemas/Metrics'
+        metadata:
+          type: object
+          additionalProperties:
+            type: string
 
 
   #############################


Reply via email to