This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 715c9b90f5 Core: Create and report metrics about Snapshots (#6246)
715c9b90f5 is described below
commit 715c9b90f537a215f38da5a8ddad19977222860b
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Mon Dec 19 17:33:08 2022 +0100
Core: Create and report metrics about Snapshots (#6246)
---
.../apache/iceberg/metrics/MetricsReporter.java | 2 +-
.../main/java/org/apache/iceberg/BaseTable.java | 20 +-
.../java/org/apache/iceberg/BaseTransaction.java | 34 ++-
.../java/org/apache/iceberg/SnapshotProducer.java | 42 +++
.../java/org/apache/iceberg/TableScanContext.java | 2 +-
.../main/java/org/apache/iceberg/Transactions.java | 18 ++
.../org/apache/iceberg/metrics/CommitMetrics.java | 50 ++++
.../iceberg/metrics/CommitMetricsResult.java | 185 +++++++++++++
.../iceberg/metrics/CommitMetricsResultParser.java | 220 +++++++++++++++
.../org/apache/iceberg/metrics/CommitReport.java | 29 +-
.../apache/iceberg/metrics/CommitReportParser.java | 101 +++++++
.../apache/iceberg/metrics/ScanReportParser.java | 7 +-
.../apache/iceberg/rest/RESTSessionCatalog.java | 6 +-
.../rest/requests/ReportMetricsRequest.java | 12 +-
.../rest/requests/ReportMetricsRequestParser.java | 13 +
.../main/java/org/apache/iceberg/util/Tasks.java | 11 +
.../org/apache/iceberg/TestCommitReporting.java | 190 +++++++++++++
.../iceberg/TestScanPlanningAndReporting.java | 14 +-
.../metrics/TestCommitMetricsResultParser.java | 239 ++++++++++++++++
.../iceberg/metrics/TestCommitReportParser.java | 302 +++++++++++++++++++++
.../org/apache/iceberg/rest/TestRESTCatalog.java | 3 +-
.../requests/TestReportMetricsRequestParser.java | 38 ++-
.../java/org/apache/iceberg/util/TestTasks.java | 59 ++++
open-api/rest-catalog-open-api.yaml | 31 +++
24 files changed, 1582 insertions(+), 46 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
index 45e3d16bff..5fae755bbe 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
@@ -23,7 +23,7 @@ package org.apache.iceberg.metrics;
public interface MetricsReporter {
/**
- * Indicates that a operation is done by reporting a {@link MetricsReport}.
A {@link
+ * Indicates that an operation is done by reporting a {@link MetricsReport}.
A {@link
* MetricsReport} is usually directly derived from a {@link MetricsReport}
instance.
*
* @param report The {@link MetricsReport} to report.
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 0b72027d55..bfa9cb7ce5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -44,7 +44,7 @@ public class BaseTable implements Table, HasTableOperations,
Serializable {
public BaseTable(TableOperations ops, String name) {
this.ops = ops;
this.name = name;
- this.reporter = new LoggingMetricsReporter();
+ this.reporter = LoggingMetricsReporter.instance();
}
public BaseTable(TableOperations ops, String name, MetricsReporter reporter)
{
@@ -171,42 +171,42 @@ public class BaseTable implements Table,
HasTableOperations, Serializable {
@Override
public AppendFiles newAppend() {
- return new MergeAppend(name, ops);
+ return new MergeAppend(name, ops).reportWith(reporter);
}
@Override
public AppendFiles newFastAppend() {
- return new FastAppend(name, ops);
+ return new FastAppend(name, ops).reportWith(reporter);
}
@Override
public RewriteFiles newRewrite() {
- return new BaseRewriteFiles(name, ops);
+ return new BaseRewriteFiles(name, ops).reportWith(reporter);
}
@Override
public RewriteManifests rewriteManifests() {
- return new BaseRewriteManifests(ops);
+ return new BaseRewriteManifests(ops).reportWith(reporter);
}
@Override
public OverwriteFiles newOverwrite() {
- return new BaseOverwriteFiles(name, ops);
+ return new BaseOverwriteFiles(name, ops).reportWith(reporter);
}
@Override
public RowDelta newRowDelta() {
- return new BaseRowDelta(name, ops);
+ return new BaseRowDelta(name, ops).reportWith(reporter);
}
@Override
public ReplacePartitions newReplacePartitions() {
- return new BaseReplacePartitions(name, ops);
+ return new BaseReplacePartitions(name, ops).reportWith(reporter);
}
@Override
public DeleteFiles newDelete() {
- return new StreamingDelete(name, ops);
+ return new StreamingDelete(name, ops).reportWith(reporter);
}
@Override
@@ -226,7 +226,7 @@ public class BaseTable implements Table,
HasTableOperations, Serializable {
@Override
public Transaction newTransaction() {
- return Transactions.newTransaction(name, ops);
+ return Transactions.newTransaction(name, ops, reporter);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 241738feda..f1832f2787 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -39,6 +39,8 @@ import
org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -71,9 +73,19 @@ public class BaseTransaction implements Transaction {
private TableMetadata base;
private TableMetadata current;
private boolean hasLastOpCommitted;
+ private final MetricsReporter reporter;
BaseTransaction(
String tableName, TableOperations ops, TransactionType type,
TableMetadata start) {
+ this(tableName, ops, type, start, LoggingMetricsReporter.instance());
+ }
+
+ BaseTransaction(
+ String tableName,
+ TableOperations ops,
+ TransactionType type,
+ TableMetadata start,
+ MetricsReporter reporter) {
this.tableName = tableName;
this.ops = ops;
this.transactionTable = new TransactionTable();
@@ -84,6 +96,7 @@ public class BaseTransaction implements Transaction {
this.base = ops.current();
this.type = type;
this.hasLastOpCommitted = true;
+ this.reporter = reporter;
}
@Override
@@ -148,7 +161,7 @@ public class BaseTransaction implements Transaction {
@Override
public AppendFiles newAppend() {
checkLastOperationCommitted("AppendFiles");
- AppendFiles append = new MergeAppend(tableName, transactionOps);
+ AppendFiles append = new MergeAppend(tableName,
transactionOps).reportWith(reporter);
append.deleteWith(enqueueDelete);
updates.add(append);
return append;
@@ -157,7 +170,7 @@ public class BaseTransaction implements Transaction {
@Override
public AppendFiles newFastAppend() {
checkLastOperationCommitted("AppendFiles");
- AppendFiles append = new FastAppend(tableName, transactionOps);
+ AppendFiles append = new FastAppend(tableName,
transactionOps).reportWith(reporter);
updates.add(append);
return append;
}
@@ -165,7 +178,7 @@ public class BaseTransaction implements Transaction {
@Override
public RewriteFiles newRewrite() {
checkLastOperationCommitted("RewriteFiles");
- RewriteFiles rewrite = new BaseRewriteFiles(tableName, transactionOps);
+ RewriteFiles rewrite = new BaseRewriteFiles(tableName,
transactionOps).reportWith(reporter);
rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
@@ -174,7 +187,7 @@ public class BaseTransaction implements Transaction {
@Override
public RewriteManifests rewriteManifests() {
checkLastOperationCommitted("RewriteManifests");
- RewriteManifests rewrite = new BaseRewriteManifests(transactionOps);
+ RewriteManifests rewrite = new
BaseRewriteManifests(transactionOps).reportWith(reporter);
rewrite.deleteWith(enqueueDelete);
updates.add(rewrite);
return rewrite;
@@ -183,7 +196,8 @@ public class BaseTransaction implements Transaction {
@Override
public OverwriteFiles newOverwrite() {
checkLastOperationCommitted("OverwriteFiles");
- OverwriteFiles overwrite = new BaseOverwriteFiles(tableName,
transactionOps);
+ OverwriteFiles overwrite =
+ new BaseOverwriteFiles(tableName, transactionOps).reportWith(reporter);
overwrite.deleteWith(enqueueDelete);
updates.add(overwrite);
return overwrite;
@@ -192,7 +206,7 @@ public class BaseTransaction implements Transaction {
@Override
public RowDelta newRowDelta() {
checkLastOperationCommitted("RowDelta");
- RowDelta delta = new BaseRowDelta(tableName, transactionOps);
+ RowDelta delta = new BaseRowDelta(tableName,
transactionOps).reportWith(reporter);
delta.deleteWith(enqueueDelete);
updates.add(delta);
return delta;
@@ -201,7 +215,8 @@ public class BaseTransaction implements Transaction {
@Override
public ReplacePartitions newReplacePartitions() {
checkLastOperationCommitted("ReplacePartitions");
- ReplacePartitions replacePartitions = new BaseReplacePartitions(tableName,
transactionOps);
+ ReplacePartitions replacePartitions =
+ new BaseReplacePartitions(tableName,
transactionOps).reportWith(reporter);
replacePartitions.deleteWith(enqueueDelete);
updates.add(replacePartitions);
return replacePartitions;
@@ -210,7 +225,7 @@ public class BaseTransaction implements Transaction {
@Override
public DeleteFiles newDelete() {
checkLastOperationCommitted("DeleteFiles");
- DeleteFiles delete = new StreamingDelete(tableName, transactionOps);
+ DeleteFiles delete = new StreamingDelete(tableName,
transactionOps).reportWith(reporter);
delete.deleteWith(enqueueDelete);
updates.add(delete);
return delete;
@@ -235,7 +250,8 @@ public class BaseTransaction implements Transaction {
CherryPickOperation cherryPick() {
checkLastOperationCommitted("CherryPick");
- CherryPickOperation cherrypick = new CherryPickOperation(tableName,
transactionOps);
+ CherryPickOperation cherrypick =
+ new CherryPickOperation(tableName,
transactionOps).reportWith(reporter);
updates.add(cherrypick);
return cherrypick;
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0e8732ea63..f11cad4e79 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -39,11 +39,19 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.metrics.LoggingMetricsReporter;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.metrics.Timer.Timed;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -76,6 +84,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private final AtomicInteger manifestCount = new AtomicInteger(0);
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
+ private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
private TableMetadata base;
private boolean stageOnly = false;
@@ -83,6 +92,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
private ExecutorService workerPool = ThreadPools.getWorkerPool();
private String targetBranch = SnapshotRef.MAIN_BRANCH;
+ private CommitMetrics commitMetrics;
protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
@@ -112,6 +122,19 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
return self();
}
+ protected CommitMetrics commitMetrics() {
+ if (commitMetrics == null) {
+ this.commitMetrics = CommitMetrics.of(new DefaultMetricsContext());
+ }
+
+ return commitMetrics;
+ }
+
+ protected ThisT reportWith(MetricsReporter newReporter) {
+ this.reporter = newReporter;
+ return self();
+ }
+
/**
* * A setter for the target branch on which snapshot producer operation
should be performed
*
@@ -328,6 +351,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
+ Timed totalDuration = commitMetrics().totalDuration().start();
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
@@ -337,6 +361,7 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
+ .countAttempts(commitMetrics().attempts())
.run(
taskOps -> {
Snapshot newSnapshot = apply();
@@ -397,6 +422,8 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
"Failed to load committed table metadata or during cleanup, skipping
further cleanup", e);
}
+ totalDuration.stop();
+
try {
notifyListeners();
} catch (Throwable e) {
@@ -409,6 +436,21 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
Object event = updateEvent();
if (event != null) {
Listeners.notifyAll(event);
+
+ if (event instanceof CreateSnapshotEvent) {
+ CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent)
event;
+
+ reporter.report(
+ ImmutableCommitReport.builder()
+ .tableName(createSnapshotEvent.tableName())
+ .snapshotId(createSnapshotEvent.snapshotId())
+ .operation(createSnapshotEvent.operation())
+ .sequenceNumber(createSnapshotEvent.sequenceNumber())
+ .metadata(EnvironmentContext.get())
+ .commitMetrics(
+ CommitMetricsResult.from(commitMetrics(),
createSnapshotEvent.summary()))
+ .build());
+ }
}
} catch (RuntimeException e) {
LOG.warn("Failed to notify listeners", e);
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index e484da6f15..6a3c7cc6e9 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -59,7 +59,7 @@ final class TableScanContext {
this.toSnapshotId = null;
this.planExecutor = null;
this.fromSnapshotInclusive = false;
- this.metricsReporter = new LoggingMetricsReporter();
+ this.metricsReporter = LoggingMetricsReporter.instance();
}
private TableScanContext(
diff --git a/core/src/main/java/org/apache/iceberg/Transactions.java
b/core/src/main/java/org/apache/iceberg/Transactions.java
index 32d3dedfe8..7afed0573a 100644
--- a/core/src/main/java/org/apache/iceberg/Transactions.java
+++ b/core/src/main/java/org/apache/iceberg/Transactions.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import org.apache.iceberg.BaseTransaction.TransactionType;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
public final class Transactions {
@@ -34,6 +35,11 @@ public final class Transactions {
return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE,
start);
}
+ public static Transaction replaceTableTransaction(
+ String tableName, TableOperations ops, TableMetadata start,
MetricsReporter reporter) {
+ return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE,
start, reporter);
+ }
+
public static Transaction createTableTransaction(
String tableName, TableOperations ops, TableMetadata start) {
Preconditions.checkArgument(
@@ -41,7 +47,19 @@ public final class Transactions {
return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE,
start);
}
+ public static Transaction createTableTransaction(
+ String tableName, TableOperations ops, TableMetadata start,
MetricsReporter reporter) {
+ Preconditions.checkArgument(
+ ops.current() == null, "Cannot start create table transaction: table
already exists");
+ return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE,
start, reporter);
+ }
+
public static Transaction newTransaction(String tableName, TableOperations
ops) {
return new BaseTransaction(tableName, ops, TransactionType.SIMPLE,
ops.refresh());
}
+
+ public static Transaction newTransaction(
+ String tableName, TableOperations ops, MetricsReporter reporter) {
+ return new BaseTransaction(tableName, ops, TransactionType.SIMPLE,
ops.refresh(), reporter);
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java
new file mode 100644
index 0000000000..26a662d62e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetrics.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.immutables.value.Value;
+
+/** Carries all metrics for a particular commit */
[email protected]
+public abstract class CommitMetrics {
+ public static final String TOTAL_DURATION = "total-duration";
+ public static final String ATTEMPTS = "attempts";
+
+ public static CommitMetrics noop() {
+ return CommitMetrics.of(MetricsContext.nullMetrics());
+ }
+
+ public abstract MetricsContext metricsContext();
+
+ @Value.Derived
+ public Timer totalDuration() {
+ return metricsContext().timer(TOTAL_DURATION, TimeUnit.NANOSECONDS);
+ }
+
+ @Value.Derived
+ public Counter attempts() {
+ return metricsContext().counter(ATTEMPTS, Unit.COUNT);
+ }
+
+ public static CommitMetrics of(MetricsContext metricsContext) {
+ return
ImmutableCommitMetrics.builder().metricsContext(metricsContext).build();
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
new file mode 100644
index 0000000000..1cb23b174a
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResult.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.Map;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.immutables.value.Value;
+
+/** A serializable version of {@link CommitMetrics} that carries its results.
*/
[email protected]
+public interface CommitMetricsResult {
+ String ADDED_DATA_FILES = "added-data-files";
+ String REMOVED_DATA_FILES = "removed-data-files";
+ String TOTAL_DATA_FILES = "total-data-files";
+ String ADDED_DELETE_FILES = "added-delete-files";
+ String ADDED_EQ_DELETE_FILES = "added-equality-delete-files";
+ String ADDED_POS_DELETE_FILES = "added-positional-delete-files";
+ String REMOVED_POS_DELETE_FILES = "removed-positional-delete-files";
+ String REMOVED_EQ_DELETE_FILES = "removed-equality-delete-files";
+ String REMOVED_DELETE_FILES = "removed-delete-files";
+ String TOTAL_DELETE_FILES = "total-delete-files";
+ String ADDED_RECORDS = "added-records";
+ String REMOVED_RECORDS = "removed-records";
+ String TOTAL_RECORDS = "total-records";
+ String ADDED_FILE_SIZE_BYTES = "added-files-size-bytes";
+ String REMOVED_FILE_SIZE_BYTES = "removed-files-size-bytes";
+ String TOTAL_FILE_SIZE_BYTES = "total-files-size-bytes";
+ String ADDED_POS_DELETES = "added-position-deletes";
+ String REMOVED_POS_DELETES = "removed-positional-deletes";
+ String TOTAL_POS_DELETES = "total-positional-deletes";
+ String ADDED_EQ_DELETES = "added-equality-deletes";
+ String REMOVED_EQ_DELETES = "removed-equality-deletes";
+ String TOTAL_EQ_DELETES = "total-equality-deletes";
+
+ @Nullable
+ TimerResult totalDuration();
+
+ @Nullable
+ CounterResult attempts();
+
+ @Nullable
+ CounterResult addedDataFiles();
+
+ @Nullable
+ CounterResult removedDataFiles();
+
+ @Nullable
+ CounterResult totalDataFiles();
+
+ @Nullable
+ CounterResult addedDeleteFiles();
+
+ @Nullable
+ CounterResult addedEqualityDeleteFiles();
+
+ @Nullable
+ CounterResult addedPositionalDeleteFiles();
+
+ @Nullable
+ CounterResult removedDeleteFiles();
+
+ @Nullable
+ CounterResult removedEqualityDeleteFiles();
+
+ @Nullable
+ CounterResult removedPositionalDeleteFiles();
+
+ @Nullable
+ CounterResult totalDeleteFiles();
+
+ @Nullable
+ CounterResult addedRecords();
+
+ @Nullable
+ CounterResult removedRecords();
+
+ @Nullable
+ CounterResult totalRecords();
+
+ @Nullable
+ CounterResult addedFilesSizeInBytes();
+
+ @Nullable
+ CounterResult removedFilesSizeInBytes();
+
+ @Nullable
+ CounterResult totalFilesSizeInBytes();
+
+ @Nullable
+ CounterResult addedPositionalDeletes();
+
+ @Nullable
+ CounterResult removedPositionalDeletes();
+
+ @Nullable
+ CounterResult totalPositionalDeletes();
+
+ @Nullable
+ CounterResult addedEqualityDeletes();
+
+ @Nullable
+ CounterResult removedEqualityDeletes();
+
+ @Nullable
+ CounterResult totalEqualityDeletes();
+
+ static CommitMetricsResult from(
+ CommitMetrics commitMetrics, Map<String, String> snapshotSummary) {
+ Preconditions.checkArgument(null != commitMetrics, "Invalid commit
metrics: null");
+ Preconditions.checkArgument(null != snapshotSummary, "Invalid snapshot
summary: null");
+ return ImmutableCommitMetricsResult.builder()
+ .attempts(CounterResult.fromCounter(commitMetrics.attempts()))
+ .totalDuration(TimerResult.fromTimer(commitMetrics.totalDuration()))
+ .addedDataFiles(counterFrom(snapshotSummary,
SnapshotSummary.ADDED_FILES_PROP))
+ .removedDataFiles(counterFrom(snapshotSummary,
SnapshotSummary.DELETED_FILES_PROP))
+ .totalDataFiles(counterFrom(snapshotSummary,
SnapshotSummary.TOTAL_DATA_FILES_PROP))
+ .addedDeleteFiles(counterFrom(snapshotSummary,
SnapshotSummary.ADDED_DELETE_FILES_PROP))
+ .addedPositionalDeleteFiles(
+ counterFrom(snapshotSummary,
SnapshotSummary.ADD_POS_DELETE_FILES_PROP))
+ .addedEqualityDeleteFiles(
+ counterFrom(snapshotSummary,
SnapshotSummary.ADD_EQ_DELETE_FILES_PROP))
+ .removedDeleteFiles(counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_DELETE_FILES_PROP))
+ .removedEqualityDeleteFiles(
+ counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP))
+ .removedPositionalDeleteFiles(
+ counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP))
+ .totalDeleteFiles(counterFrom(snapshotSummary,
SnapshotSummary.TOTAL_DELETE_FILES_PROP))
+ .addedRecords(counterFrom(snapshotSummary,
SnapshotSummary.ADDED_RECORDS_PROP))
+ .removedRecords(counterFrom(snapshotSummary,
SnapshotSummary.DELETED_RECORDS_PROP))
+ .totalRecords(counterFrom(snapshotSummary,
SnapshotSummary.TOTAL_RECORDS_PROP))
+ .addedFilesSizeInBytes(
+ counterFrom(snapshotSummary, SnapshotSummary.ADDED_FILE_SIZE_PROP,
Unit.BYTES))
+ .removedFilesSizeInBytes(
+ counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_FILE_SIZE_PROP, Unit.BYTES))
+ .totalFilesSizeInBytes(
+ counterFrom(snapshotSummary, SnapshotSummary.TOTAL_FILE_SIZE_PROP,
Unit.BYTES))
+ .addedPositionalDeletes(
+ counterFrom(snapshotSummary,
SnapshotSummary.ADDED_POS_DELETES_PROP))
+ .removedPositionalDeletes(
+ counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_POS_DELETES_PROP))
+ .totalPositionalDeletes(
+ counterFrom(snapshotSummary,
SnapshotSummary.TOTAL_POS_DELETES_PROP))
+ .addedEqualityDeletes(counterFrom(snapshotSummary,
SnapshotSummary.ADDED_EQ_DELETES_PROP))
+ .removedEqualityDeletes(
+ counterFrom(snapshotSummary,
SnapshotSummary.REMOVED_EQ_DELETES_PROP))
+ .totalEqualityDeletes(counterFrom(snapshotSummary,
SnapshotSummary.TOTAL_EQ_DELETES_PROP))
+ .build();
+ }
+
+ static CounterResult counterFrom(Map<String, String> snapshotSummary, String
metricName) {
+ return counterFrom(snapshotSummary, metricName, Unit.COUNT);
+ }
+
+ static CounterResult counterFrom(
+ Map<String, String> snapshotSummary, String metricName, Unit unit) {
+ if (!snapshotSummary.containsKey(metricName)) {
+ return null;
+ }
+
+ try {
+ return CounterResult.of(unit,
Long.parseLong(snapshotSummary.get(metricName)));
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
new file mode 100644
index 0000000000..d4fd883c43
--- /dev/null
+++
b/core/src/main/java/org/apache/iceberg/metrics/CommitMetricsResultParser.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+class CommitMetricsResultParser {
+ private CommitMetricsResultParser() {}
+
+ static String toJson(CommitMetricsResult metrics) {
+ return toJson(metrics, false);
+ }
+
+ static String toJson(CommitMetricsResult metrics, boolean pretty) {
+ return JsonUtil.generate(gen -> toJson(metrics, gen), pretty);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws
IOException {
+ Preconditions.checkArgument(null != metrics, "Invalid commit metrics:
null");
+
+ gen.writeStartObject();
+
+ if (null != metrics.totalDuration()) {
+ gen.writeFieldName(CommitMetrics.TOTAL_DURATION);
+ TimerResultParser.toJson(metrics.totalDuration(), gen);
+ }
+
+ if (null != metrics.attempts()) {
+ gen.writeFieldName(CommitMetrics.ATTEMPTS);
+ CounterResultParser.toJson(metrics.attempts(), gen);
+ }
+
+ if (null != metrics.addedDataFiles()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_DATA_FILES);
+ CounterResultParser.toJson(metrics.addedDataFiles(), gen);
+ }
+
+ if (null != metrics.removedDataFiles()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_DATA_FILES);
+ CounterResultParser.toJson(metrics.removedDataFiles(), gen);
+ }
+
+ if (null != metrics.totalDataFiles()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_DATA_FILES);
+ CounterResultParser.toJson(metrics.totalDataFiles(), gen);
+ }
+
+ if (null != metrics.addedDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_DELETE_FILES);
+ CounterResultParser.toJson(metrics.addedDeleteFiles(), gen);
+ }
+
+ if (null != metrics.addedEqualityDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_EQ_DELETE_FILES);
+ CounterResultParser.toJson(metrics.addedEqualityDeleteFiles(), gen);
+ }
+
+ if (null != metrics.addedPositionalDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_POS_DELETE_FILES);
+ CounterResultParser.toJson(metrics.addedPositionalDeleteFiles(), gen);
+ }
+
+ if (null != metrics.removedDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_DELETE_FILES);
+ CounterResultParser.toJson(metrics.removedDeleteFiles(), gen);
+ }
+
+ if (null != metrics.removedPositionalDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_POS_DELETE_FILES);
+ CounterResultParser.toJson(metrics.removedPositionalDeleteFiles(), gen);
+ }
+
+ if (null != metrics.removedEqualityDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_EQ_DELETE_FILES);
+ CounterResultParser.toJson(metrics.removedEqualityDeleteFiles(), gen);
+ }
+
+ if (null != metrics.totalDeleteFiles()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_DELETE_FILES);
+ CounterResultParser.toJson(metrics.totalDeleteFiles(), gen);
+ }
+
+ if (null != metrics.addedRecords()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_RECORDS);
+ CounterResultParser.toJson(metrics.addedRecords(), gen);
+ }
+
+ if (null != metrics.removedRecords()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_RECORDS);
+ CounterResultParser.toJson(metrics.removedRecords(), gen);
+ }
+
+ if (null != metrics.totalRecords()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_RECORDS);
+ CounterResultParser.toJson(metrics.totalRecords(), gen);
+ }
+
+ if (null != metrics.addedFilesSizeInBytes()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_FILE_SIZE_BYTES);
+ CounterResultParser.toJson(metrics.addedFilesSizeInBytes(), gen);
+ }
+
+ if (null != metrics.removedFilesSizeInBytes()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_FILE_SIZE_BYTES);
+ CounterResultParser.toJson(metrics.removedFilesSizeInBytes(), gen);
+ }
+
+ if (null != metrics.totalFilesSizeInBytes()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_FILE_SIZE_BYTES);
+ CounterResultParser.toJson(metrics.totalFilesSizeInBytes(), gen);
+ }
+
+ if (null != metrics.addedPositionalDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_POS_DELETES);
+ CounterResultParser.toJson(metrics.addedPositionalDeletes(), gen);
+ }
+
+ if (null != metrics.removedPositionalDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_POS_DELETES);
+ CounterResultParser.toJson(metrics.removedPositionalDeletes(), gen);
+ }
+
+ if (null != metrics.totalPositionalDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_POS_DELETES);
+ CounterResultParser.toJson(metrics.totalPositionalDeletes(), gen);
+ }
+
+ if (null != metrics.addedEqualityDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.ADDED_EQ_DELETES);
+ CounterResultParser.toJson(metrics.addedEqualityDeletes(), gen);
+ }
+
+ if (null != metrics.removedEqualityDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.REMOVED_EQ_DELETES);
+ CounterResultParser.toJson(metrics.removedEqualityDeletes(), gen);
+ }
+
+ if (null != metrics.totalEqualityDeletes()) {
+ gen.writeFieldName(CommitMetricsResult.TOTAL_EQ_DELETES);
+ CounterResultParser.toJson(metrics.totalEqualityDeletes(), gen);
+ }
+
+ gen.writeEndObject();
+ }
+
+ static CommitMetricsResult fromJson(String json) {
+ return JsonUtil.parse(json, CommitMetricsResultParser::fromJson);
+ }
+
+ static CommitMetricsResult fromJson(JsonNode json) {
+ Preconditions.checkArgument(null != json, "Cannot parse commit metrics
from null object");
+ Preconditions.checkArgument(
+ json.isObject(), "Cannot parse commit metrics from non-object: %s",
json);
+
+ return ImmutableCommitMetricsResult.builder()
+ .attempts(CounterResultParser.fromJson(CommitMetrics.ATTEMPTS, json))
+
.totalDuration(TimerResultParser.fromJson(CommitMetrics.TOTAL_DURATION, json))
+
.addedDataFiles(CounterResultParser.fromJson(CommitMetricsResult.ADDED_DATA_FILES,
json))
+ .removedDataFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DATA_FILES, json))
+
.totalDataFiles(CounterResultParser.fromJson(CommitMetricsResult.TOTAL_DATA_FILES,
json))
+ .addedDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.ADDED_DELETE_FILES, json))
+ .addedEqualityDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.ADDED_EQ_DELETE_FILES, json))
+ .addedPositionalDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.ADDED_POS_DELETE_FILES, json))
+ .removedEqualityDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETE_FILES, json))
+ .removedPositionalDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_POS_DELETE_FILES,
json))
+ .removedDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DELETE_FILES, json))
+ .totalDeleteFiles(
+
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_DELETE_FILES, json))
+
.addedRecords(CounterResultParser.fromJson(CommitMetricsResult.ADDED_RECORDS,
json))
+
.removedRecords(CounterResultParser.fromJson(CommitMetricsResult.REMOVED_RECORDS,
json))
+
.totalRecords(CounterResultParser.fromJson(CommitMetricsResult.TOTAL_RECORDS,
json))
+ .addedFilesSizeInBytes(
+
CounterResultParser.fromJson(CommitMetricsResult.ADDED_FILE_SIZE_BYTES, json))
+ .removedFilesSizeInBytes(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_FILE_SIZE_BYTES, json))
+ .totalFilesSizeInBytes(
+
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_FILE_SIZE_BYTES, json))
+ .addedPositionalDeletes(
+
CounterResultParser.fromJson(CommitMetricsResult.ADDED_POS_DELETES, json))
+ .removedPositionalDeletes(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_POS_DELETES, json))
+ .totalPositionalDeletes(
+
CounterResultParser.fromJson(CommitMetricsResult.TOTAL_POS_DELETES, json))
+ .addedEqualityDeletes(
+ CounterResultParser.fromJson(CommitMetricsResult.ADDED_EQ_DELETES,
json))
+ .removedEqualityDeletes(
+
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETES, json))
+ .totalEqualityDeletes(
+ CounterResultParser.fromJson(CommitMetricsResult.TOTAL_EQ_DELETES,
json))
+ .build();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
b/core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
similarity index 66%
copy from api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
copy to core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
index 45e3d16bff..bbe032eb27 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitReport.java
@@ -18,15 +18,22 @@
*/
package org.apache.iceberg.metrics;
-/** This interface defines the basic API for reporting metrics for operations
to a Table. */
-@FunctionalInterface
-public interface MetricsReporter {
-
- /**
- * Indicates that a operation is done by reporting a {@link MetricsReport}.
A {@link
- * MetricsReport} is usually directly derived from a {@link MetricsReport}
instance.
- *
- * @param report The {@link MetricsReport} to report.
- */
- void report(MetricsReport report);
+import java.util.Map;
+import org.immutables.value.Value;
+
+/** A commit report that contains all relevant information from a Snapshot. */
[email protected]
+public interface CommitReport extends MetricsReport {
+
+ String tableName();
+
+ long snapshotId();
+
+ long sequenceNumber();
+
+ String operation();
+
+ CommitMetricsResult commitMetrics();
+
+ Map<String, String> metadata();
}
diff --git
a/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java
b/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java
new file mode 100644
index 0000000000..b6b3f71250
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/CommitReportParser.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+
+public class CommitReportParser {
+ private static final String TABLE_NAME = "table-name";
+ private static final String SNAPSHOT_ID = "snapshot-id";
+ private static final String SEQUENCE_NUMBER = "sequence-number";
+ private static final String OPERATION = "operation";
+ private static final String METRICS = "metrics";
+ private static final String METADATA = "metadata";
+
+ private CommitReportParser() {}
+
+ public static String toJson(CommitReport commitReport) {
+ return toJson(commitReport, false);
+ }
+
+ public static String toJson(CommitReport commitReport, boolean pretty) {
+ return JsonUtil.generate(gen -> toJson(commitReport, gen), pretty);
+ }
+
+ public static void toJson(CommitReport commitReport, JsonGenerator gen)
throws IOException {
+ Preconditions.checkArgument(null != commitReport, "Invalid commit report:
null");
+
+ gen.writeStartObject();
+ toJsonWithoutStartEnd(commitReport, gen);
+ gen.writeEndObject();
+ }
+
+ /**
+ * This serializes the {@link CommitReport} without writing a start/end
object and is mainly used
+ * by {@link org.apache.iceberg.rest.requests.ReportMetricsRequestParser}.
+ *
+ * @param commitReport The {@link CommitReport} to serialize
+ * @param gen The {@link JsonGenerator} to use
+ * @throws IOException If an error occurs while serializing
+ */
+ public static void toJsonWithoutStartEnd(CommitReport commitReport,
JsonGenerator gen)
+ throws IOException {
+ Preconditions.checkArgument(null != commitReport, "Invalid commit report:
null");
+
+ gen.writeStringField(TABLE_NAME, commitReport.tableName());
+ gen.writeNumberField(SNAPSHOT_ID, commitReport.snapshotId());
+ gen.writeNumberField(SEQUENCE_NUMBER, commitReport.sequenceNumber());
+ gen.writeStringField(OPERATION, commitReport.operation());
+
+ gen.writeFieldName(METRICS);
+ CommitMetricsResultParser.toJson(commitReport.commitMetrics(), gen);
+
+ if (!commitReport.metadata().isEmpty()) {
+ JsonUtil.writeStringMap(METADATA, commitReport.metadata(), gen);
+ }
+ }
+
+ public static CommitReport fromJson(String json) {
+ return JsonUtil.parse(json, CommitReportParser::fromJson);
+ }
+
+ public static CommitReport fromJson(JsonNode json) {
+ Preconditions.checkArgument(null != json, "Cannot parse commit report from
null object");
+ Preconditions.checkArgument(
+ json.isObject(), "Cannot parse commit report from non-object: %s",
json);
+
+ ImmutableCommitReport.Builder builder =
+ ImmutableCommitReport.builder()
+ .tableName(JsonUtil.getString(TABLE_NAME, json))
+ .snapshotId(JsonUtil.getLong(SNAPSHOT_ID, json))
+ .sequenceNumber(JsonUtil.getLong(SEQUENCE_NUMBER, json))
+ .operation(JsonUtil.getString(OPERATION, json))
+
.commitMetrics(CommitMetricsResultParser.fromJson(JsonUtil.get(METRICS, json)));
+
+ if (json.has(METADATA)) {
+ builder.metadata(JsonUtil.getStringMap(METADATA, json));
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
b/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
index 32b43af395..cdbe19fad2 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanReportParser.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -83,11 +82,7 @@ public class ScanReportParser {
ScanMetricsResultParser.toJson(scanReport.scanMetrics(), gen);
if (!scanReport.metadata().isEmpty()) {
- gen.writeObjectFieldStart(METADATA);
- for (Map.Entry<String, String> entry : scanReport.metadata().entrySet())
{
- gen.writeStringField(entry.getKey(), entry.getValue());
- }
- gen.writeEndObject();
+ JsonUtil.writeStringMap(METADATA, scanReport.metadata(), gen);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index d4757c4ed5..d52dd372c9 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -601,7 +601,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
createChanges(meta),
meta);
- return Transactions.createTableTransaction(fullName, ops, meta);
+ return Transactions.createTableTransaction(
+ fullName, ops, meta, report -> reportMetrics(ident, report,
session::headers));
}
@Override
@@ -651,7 +652,8 @@ public class RESTSessionCatalog extends BaseSessionCatalog
changes.build(),
base);
- return Transactions.replaceTableTransaction(fullName, ops, replacement);
+ return Transactions.replaceTableTransaction(
+ fullName, ops, replacement, report -> reportMetrics(ident, report,
session::headers));
}
@Override
diff --git
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
index 963cb1a4f0..c85a6e9add 100644
---
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
+++
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequest.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.rest.requests;
import java.util.Locale;
+import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -29,7 +30,8 @@ import org.immutables.value.Value;
public interface ReportMetricsRequest extends RESTRequest {
enum ReportType {
- SCAN_REPORT;
+ SCAN_REPORT,
+ COMMIT_REPORT;
static ReportType fromString(String reportType) {
Preconditions.checkArgument(null != reportType, "Invalid report type:
null");
@@ -52,7 +54,13 @@ public interface ReportMetricsRequest extends RESTRequest {
}
static ReportMetricsRequest of(MetricsReport report) {
- ReportType reportType = report instanceof ScanReport ?
ReportType.SCAN_REPORT : null;
+ ReportType reportType = null;
+ if (report instanceof ScanReport) {
+ reportType = ReportType.SCAN_REPORT;
+ } else if (report instanceof CommitReport) {
+ reportType = ReportType.COMMIT_REPORT;
+ }
+
Preconditions.checkArgument(
null != reportType, "Unsupported report type: %s",
report.getClass().getName());
diff --git
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
index f180da09cd..e6d98cdac3 100644
---
a/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
+++
b/core/src/main/java/org/apache/iceberg/rest/requests/ReportMetricsRequestParser.java
@@ -22,6 +22,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.Locale;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.CommitReportParser;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.ScanReportParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -53,6 +55,10 @@ public class ReportMetricsRequestParser {
ScanReportParser.toJsonWithoutStartEnd((ScanReport) request.report(),
gen);
}
+ if (ReportType.COMMIT_REPORT == request.reportType()) {
+ CommitReportParser.toJsonWithoutStartEnd((CommitReport)
request.report(), gen);
+ }
+
gen.writeEndObject();
}
@@ -81,6 +87,13 @@ public class ReportMetricsRequestParser {
.build();
}
+ if (ReportType.COMMIT_REPORT == type) {
+ return ImmutableReportMetricsRequest.builder()
+ .reportType(type)
+ .report(CommitReportParser.fromJson(json))
+ .build();
+ }
+
throw new IllegalArgumentException(String.format("Cannot build metrics
request from %s", json));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java
b/core/src/main/java/org/apache/iceberg/util/Tasks.java
index dfd4ab9984..e420145c8d 100644
--- a/core/src/main/java/org/apache/iceberg/util/Tasks.java
+++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream;
+import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +87,7 @@ public class Tasks {
private long maxSleepTimeMs = 600000; // 10 minutes
private long maxDurationMs = 600000; // 10 minutes
private double scaleFactor = 2.0; // exponential
+ private Counter attemptsCounter;
public Builder(Iterable<I> items) {
this.items = items;
@@ -173,6 +175,11 @@ public class Tasks {
return this;
}
+ public Builder<I> countAttempts(Counter counter) {
+ this.attemptsCounter = counter;
+ return this;
+ }
+
public Builder<I> exponentialBackoff(
long backoffMinSleepTimeMs,
long backoffMaxSleepTimeMs,
@@ -398,6 +405,10 @@ public class Tasks {
int attempt = 0;
while (true) {
attempt += 1;
+ if (null != attemptsCounter) {
+ attemptsCounter.increment();
+ }
+
try {
task.run(item);
break;
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
new file mode 100644
index 0000000000..9998c47ff3
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.iceberg.TestScanPlanningAndReporting.TestMetricsReporter;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+public class TestCommitReporting extends TableTestBase {
+
+ private final TestMetricsReporter reporter = new TestMetricsReporter();
+
+ public TestCommitReporting() {
+ super(2);
+ }
+
+ @Test
+ public void addAndDeleteDataFiles() {
+ String tableName = "add-and-delete-data-files";
+ Table table =
+ TestTables.create(
+ tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
+ table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+ CommitReport report = reporter.lastCommitReport();
+ assertThat(report).isNotNull();
+ assertThat(report.operation()).isEqualTo("append");
+ assertThat(report.snapshotId()).isEqualTo(1L);
+ assertThat(report.sequenceNumber()).isEqualTo(1L);
+ assertThat(report.tableName()).isEqualTo(tableName);
+
+ CommitMetricsResult metrics = report.commitMetrics();
+ assertThat(metrics.addedDataFiles().value()).isEqualTo(2L);
+ assertThat(metrics.totalDataFiles().value()).isEqualTo(2L);
+
+ assertThat(metrics.addedRecords().value()).isEqualTo(2L);
+ assertThat(metrics.totalRecords().value()).isEqualTo(2L);
+
+ assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(20L);
+ assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(20L);
+
+ // now remove those 2 data files
+ table.newDelete().deleteFile(FILE_A).deleteFile(FILE_D).commit();
+ report = reporter.lastCommitReport();
+ assertThat(report).isNotNull();
+ assertThat(report.operation()).isEqualTo("delete");
+ assertThat(report.snapshotId()).isEqualTo(2L);
+ assertThat(report.sequenceNumber()).isEqualTo(2L);
+ assertThat(report.tableName()).isEqualTo(tableName);
+
+ metrics = report.commitMetrics();
+ assertThat(metrics.removedDataFiles().value()).isEqualTo(2L);
+ assertThat(metrics.totalDeleteFiles().value()).isEqualTo(0L);
+
+ assertThat(metrics.removedRecords().value()).isEqualTo(2L);
+ assertThat(metrics.totalRecords().value()).isEqualTo(0L);
+
+ assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(20L);
+ assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);
+ }
+
+ @Test
+ public void addAndDeleteDeleteFiles() {
+ String tableName = "add-and-delete-delete-files";
+ Table table =
+ TestTables.create(
+ tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
+
+ // 2 positional + 1 equality
+ table
+ .newRowDelta()
+ .addDeletes(FILE_A_DELETES)
+ .addDeletes(FILE_B_DELETES)
+ .addDeletes(FILE_C2_DELETES)
+ .commit();
+
+ CommitReport report = reporter.lastCommitReport();
+ assertThat(report).isNotNull();
+ assertThat(report.operation()).isEqualTo("overwrite");
+ assertThat(report.snapshotId()).isEqualTo(1L);
+ assertThat(report.sequenceNumber()).isEqualTo(1L);
+ assertThat(report.tableName()).isEqualTo(tableName);
+
+ CommitMetricsResult metrics = report.commitMetrics();
+ assertThat(metrics.addedDeleteFiles().value()).isEqualTo(3L);
+ assertThat(metrics.totalDeleteFiles().value()).isEqualTo(3L);
+ assertThat(metrics.addedPositionalDeleteFiles().value()).isEqualTo(2L);
+ assertThat(metrics.addedEqualityDeleteFiles().value()).isEqualTo(1L);
+
+ assertThat(metrics.addedPositionalDeletes().value()).isEqualTo(2L);
+ assertThat(metrics.totalPositionalDeletes().value()).isEqualTo(2L);
+
+ assertThat(metrics.addedEqualityDeletes().value()).isEqualTo(1L);
+ assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(1L);
+
+ assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(30L);
+ assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(30L);
+
+ // now remove those 2 positional + 1 equality delete files
+ table
+ .newRewrite()
+ .rewriteFiles(
+ ImmutableSet.of(),
+ ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES, FILE_C2_DELETES),
+ ImmutableSet.of(),
+ ImmutableSet.of())
+ .commit();
+
+ report = reporter.lastCommitReport();
+ assertThat(report).isNotNull();
+ assertThat(report.operation()).isEqualTo("replace");
+ assertThat(report.snapshotId()).isEqualTo(2L);
+ assertThat(report.sequenceNumber()).isEqualTo(2L);
+ assertThat(report.tableName()).isEqualTo(tableName);
+
+ metrics = report.commitMetrics();
+ assertThat(metrics.removedDeleteFiles().value()).isEqualTo(3L);
+ assertThat(metrics.totalDeleteFiles().value()).isEqualTo(0L);
+ assertThat(metrics.removedPositionalDeleteFiles().value()).isEqualTo(2L);
+ assertThat(metrics.removedEqualityDeleteFiles().value()).isEqualTo(1L);
+
+ assertThat(metrics.removedPositionalDeletes().value()).isEqualTo(2L);
+ assertThat(metrics.totalPositionalDeletes().value()).isEqualTo(0L);
+
+ assertThat(metrics.removedEqualityDeletes().value()).isEqualTo(1L);
+ assertThat(metrics.totalEqualityDeletes().value()).isEqualTo(0L);
+
+ assertThat(metrics.removedFilesSizeInBytes().value()).isEqualTo(30L);
+ assertThat(metrics.totalFilesSizeInBytes().value()).isEqualTo(0L);
+ }
+
+ @Test
+ public void addAndDeleteManifests() throws IOException {
+ String tableName = "add-and-delete-manifests";
+ Table table =
+ TestTables.create(
+ tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
+
+ table.newAppend().appendFile(FILE_A).commit();
+ Snapshot snap1 = table.currentSnapshot();
+ table.newAppend().appendFile(FILE_B).commit();
+ Snapshot snap2 = table.currentSnapshot();
+
+ ManifestFile newManifest =
+ writeManifest(
+ "manifest-file.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(),
FILE_A),
+ manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(),
FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+ for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+ rewriteManifests.deleteManifest(manifest);
+ }
+
+ rewriteManifests.addManifest(newManifest).commit();
+
+ CommitReport report = reporter.lastCommitReport();
+ assertThat(report).isNotNull();
+ assertThat(report.operation()).isEqualTo("append");
+ assertThat(report.snapshotId()).isEqualTo(2L);
+ assertThat(report.sequenceNumber()).isEqualTo(2L);
+ assertThat(report.tableName()).isEqualTo(tableName);
+
+ CommitMetricsResult metrics = report.commitMetrics();
+ assertThat(metrics.addedDataFiles().value()).isEqualTo(1L);
+ assertThat(metrics.addedRecords().value()).isEqualTo(1L);
+ assertThat(metrics.addedFilesSizeInBytes().value()).isEqualTo(10L);
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index e39ed13cac..6284e08d45 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import java.util.List;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
@@ -245,10 +246,10 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
assertThat(result.positionalDeleteFiles().value()).isEqualTo(1);
}
- private static class TestMetricsReporter implements MetricsReporter {
+ static class TestMetricsReporter implements MetricsReporter {
private final List<MetricsReport> reports = Lists.newArrayList();
// this is mainly so that we see scan reports being logged during tests
- private final LoggingMetricsReporter delegate = new
LoggingMetricsReporter();
+ private final LoggingMetricsReporter delegate =
LoggingMetricsReporter.instance();
@Override
public void report(MetricsReport report) {
@@ -260,7 +261,16 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
if (reports.isEmpty()) {
return null;
}
+
return (ScanReport) reports.get(reports.size() - 1);
}
+
+ public CommitReport lastCommitReport() {
+ if (reports.isEmpty()) {
+ return null;
+ }
+
+ return (CommitReport) reports.get(reports.size() - 1);
+ }
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
new file mode 100644
index 0000000000..5ba3aecbdc
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitMetricsResultParser.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+public class TestCommitMetricsResultParser {
+
+ @Test
+ public void nullMetrics() {
+ assertThatThrownBy(() -> CommitMetricsResultParser.fromJson((JsonNode)
null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse commit metrics from null object");
+
+ assertThatThrownBy(() -> CommitMetricsResultParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid commit metrics: null");
+
+ assertThatThrownBy(() -> CommitMetricsResult.from(null, ImmutableMap.of()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid commit metrics: null");
+
+ assertThatThrownBy(
+ () -> CommitMetricsResult.from(CommitMetrics.of(new
DefaultMetricsContext()), null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid snapshot summary: null");
+ }
+
+ @Test
+ public void invalidNumberInSnapshotSummary() {
+ CommitMetricsResult result =
+ CommitMetricsResult.from(
+ CommitMetrics.of(new DefaultMetricsContext()),
+ ImmutableMap.of(SnapshotSummary.ADDED_FILES_PROP, "xyz"));
+ assertThat(result.addedDataFiles()).isNull();
+ }
+
+ @SuppressWarnings("MethodLength")
+ @Test
+ public void roundTripSerde() {
+ CommitMetrics commitMetrics = CommitMetrics.of(new
DefaultMetricsContext());
+ commitMetrics.totalDuration().record(100, TimeUnit.SECONDS);
+ commitMetrics.attempts().increment(4);
+ Map<String, String> snapshotSummary =
+ ImmutableMap.<String, String>builder()
+ .put(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .put(SnapshotSummary.DELETED_FILES_PROP, "2")
+ .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+ .put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
+ .put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
+ .put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
+ .put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
+ .put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
+ .put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
+ .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "10")
+ .put(SnapshotSummary.ADDED_RECORDS_PROP, "11")
+ .put(SnapshotSummary.DELETED_RECORDS_PROP, "12")
+ .put(SnapshotSummary.TOTAL_RECORDS_PROP, "13")
+ .put(SnapshotSummary.ADDED_FILE_SIZE_PROP, "14")
+ .put(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "15")
+ .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "16")
+ .put(SnapshotSummary.ADDED_POS_DELETES_PROP, "17")
+ .put(SnapshotSummary.ADDED_EQ_DELETES_PROP, "18")
+ .put(SnapshotSummary.REMOVED_POS_DELETES_PROP, "19")
+ .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
+ .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
+ .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+ .build();
+
+ CommitMetricsResult result = CommitMetricsResult.from(commitMetrics,
snapshotSummary);
+ assertThat(result.attempts().value()).isEqualTo(4L);
+
assertThat(result.totalDuration().totalDuration()).isEqualTo(Duration.ofSeconds(100));
+ assertThat(result.addedDataFiles().value()).isEqualTo(1L);
+ assertThat(result.removedDataFiles().value()).isEqualTo(2L);
+ assertThat(result.totalDataFiles().value()).isEqualTo(3L);
+ assertThat(result.addedDeleteFiles().value()).isEqualTo(4L);
+ assertThat(result.addedEqualityDeleteFiles().value()).isEqualTo(5L);
+ assertThat(result.addedPositionalDeleteFiles().value()).isEqualTo(6L);
+ assertThat(result.removedPositionalDeleteFiles().value()).isEqualTo(7L);
+ assertThat(result.removedEqualityDeleteFiles().value()).isEqualTo(8L);
+ assertThat(result.removedDeleteFiles().value()).isEqualTo(9L);
+ assertThat(result.totalDeleteFiles().value()).isEqualTo(10L);
+ assertThat(result.addedRecords().value()).isEqualTo(11L);
+ assertThat(result.removedRecords().value()).isEqualTo(12L);
+ assertThat(result.totalRecords().value()).isEqualTo(13L);
+ assertThat(result.addedFilesSizeInBytes().value()).isEqualTo(14L);
+ assertThat(result.removedFilesSizeInBytes().value()).isEqualTo(15L);
+ assertThat(result.totalFilesSizeInBytes().value()).isEqualTo(16L);
+ assertThat(result.addedPositionalDeletes().value()).isEqualTo(17L);
+ assertThat(result.addedEqualityDeletes().value()).isEqualTo(18L);
+ assertThat(result.removedPositionalDeletes().value()).isEqualTo(19L);
+ assertThat(result.removedEqualityDeletes().value()).isEqualTo(20L);
+ assertThat(result.totalPositionalDeletes().value()).isEqualTo(21L);
+ assertThat(result.totalEqualityDeletes().value()).isEqualTo(22L);
+
+ String expectedJson =
+ "{\n"
+ + " \"total-duration\" : {\n"
+ + " \"count\" : 1,\n"
+ + " \"time-unit\" : \"nanoseconds\",\n"
+ + " \"total-duration\" : 100000000000\n"
+ + " },\n"
+ + " \"attempts\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 4\n"
+ + " },\n"
+ + " \"added-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 1\n"
+ + " },\n"
+ + " \"removed-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 2\n"
+ + " },\n"
+ + " \"total-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 3\n"
+ + " },\n"
+ + " \"added-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 4\n"
+ + " },\n"
+ + " \"added-equality-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 5\n"
+ + " },\n"
+ + " \"added-positional-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 6\n"
+ + " },\n"
+ + " \"removed-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 9\n"
+ + " },\n"
+ + " \"removed-positional-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 7\n"
+ + " },\n"
+ + " \"removed-equality-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 8\n"
+ + " },\n"
+ + " \"total-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 10\n"
+ + " },\n"
+ + " \"added-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 11\n"
+ + " },\n"
+ + " \"removed-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 12\n"
+ + " },\n"
+ + " \"total-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 13\n"
+ + " },\n"
+ + " \"added-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 14\n"
+ + " },\n"
+ + " \"removed-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 15\n"
+ + " },\n"
+ + " \"total-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 16\n"
+ + " },\n"
+ + " \"added-position-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 17\n"
+ + " },\n"
+ + " \"removed-positional-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 19\n"
+ + " },\n"
+ + " \"total-positional-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 21\n"
+ + " },\n"
+ + " \"added-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 18\n"
+ + " },\n"
+ + " \"removed-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 20\n"
+ + " },\n"
+ + " \"total-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 22\n"
+ + " }\n"
+ + "}";
+
+ String json = CommitMetricsResultParser.toJson(result, true);
+ assertThat(CommitMetricsResultParser.fromJson(json)).isEqualTo(result);
+ assertThat(json).isEqualTo(expectedJson);
+ }
+
+ @Test
+ public void roundTripSerdeNoopCommitMetrics() {
+ CommitMetricsResult commitMetricsResult =
+ CommitMetricsResult.from(CommitMetrics.noop(),
SnapshotSummary.builder().build());
+ String expectedJson = "{ }";
+
+ String json = CommitMetricsResultParser.toJson(commitMetricsResult, true);
+ System.out.println(json);
+ assertThat(json).isEqualTo(expectedJson);
+ assertThat(CommitMetricsResultParser.fromJson(json))
+ .isEqualTo(ImmutableCommitMetricsResult.builder().build());
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java
b/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java
new file mode 100644
index 0000000000..6c2d9edd36
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/metrics/TestCommitReportParser.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestCommitReportParser {
+
+ @Test
+ public void nullCommitReport() {
+ Assertions.assertThatThrownBy(() -> CommitReportParser.fromJson((JsonNode)
null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse commit report from null object");
+
+ Assertions.assertThatThrownBy(() -> CommitReportParser.toJson(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid commit report: null");
+ }
+
+ @Test
+ public void missingFields() {
+ Assertions.assertThatThrownBy(() -> CommitReportParser.fromJson("{}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: table-name");
+
+ Assertions.assertThatThrownBy(
+ () ->
CommitReportParser.fromJson("{\"table-name\":\"roundTripTableName\"}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing long: snapshot-id");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ CommitReportParser.fromJson(
+
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing long: sequence-number");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ CommitReportParser.fromJson(
+
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23,\"sequence-number\":24}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing string: operation");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ CommitReportParser.fromJson(
+
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":23,\"sequence-number\":24,
\"operation\": \"DELETE\"}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse missing field: metrics");
+ }
+
+ @Test
+ public void invalidTableName() {
+ Assertions.assertThatThrownBy(() ->
CommitReportParser.fromJson("{\"table-name\":23}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse to a string value: table-name: 23");
+ }
+
+ @Test
+ public void invalidSnapshotId() {
+ Assertions.assertThatThrownBy(
+ () ->
+ CommitReportParser.fromJson(
+
"{\"table-name\":\"roundTripTableName\",\"snapshot-id\":\"invalid\"}"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot parse to a long value: snapshot-id: \"invalid\"");
+ }
+
+ @SuppressWarnings("MethodLength")
+ @Test
+ public void roundTripSerde() {
+ CommitMetrics commitMetrics = CommitMetrics.of(new
DefaultMetricsContext());
+ commitMetrics.totalDuration().record(100, TimeUnit.SECONDS);
+ commitMetrics.attempts().increment(4);
+ Map<String, String> snapshotSummary =
+ com.google.common.collect.ImmutableMap.<String, String>builder()
+ .put(SnapshotSummary.ADDED_FILES_PROP, "1")
+ .put(SnapshotSummary.DELETED_FILES_PROP, "2")
+ .put(SnapshotSummary.TOTAL_DATA_FILES_PROP, "3")
+ .put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
+ .put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
+ .put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
+ .put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
+ .put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
+ .put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
+ .put(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "10")
+ .put(SnapshotSummary.ADDED_RECORDS_PROP, "11")
+ .put(SnapshotSummary.DELETED_RECORDS_PROP, "12")
+ .put(SnapshotSummary.TOTAL_RECORDS_PROP, "13")
+ .put(SnapshotSummary.ADDED_FILE_SIZE_PROP, "14")
+ .put(SnapshotSummary.REMOVED_FILE_SIZE_PROP, "15")
+ .put(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "16")
+ .put(SnapshotSummary.ADDED_POS_DELETES_PROP, "17")
+ .put(SnapshotSummary.ADDED_EQ_DELETES_PROP, "18")
+ .put(SnapshotSummary.REMOVED_POS_DELETES_PROP, "19")
+ .put(SnapshotSummary.REMOVED_EQ_DELETES_PROP, "20")
+ .put(SnapshotSummary.TOTAL_POS_DELETES_PROP, "21")
+ .put(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "22")
+ .build();
+
+ String tableName = "roundTripTableName";
+ CommitReport commitReport =
+ ImmutableCommitReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .operation("DELETE")
+ .sequenceNumber(4L)
+ .commitMetrics(CommitMetricsResult.from(commitMetrics,
snapshotSummary))
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"table-name\" : \"roundTripTableName\",\n"
+ + " \"snapshot-id\" : 23,\n"
+ + " \"sequence-number\" : 4,\n"
+ + " \"operation\" : \"DELETE\",\n"
+ + " \"metrics\" : {\n"
+ + " \"total-duration\" : {\n"
+ + " \"count\" : 1,\n"
+ + " \"time-unit\" : \"nanoseconds\",\n"
+ + " \"total-duration\" : 100000000000\n"
+ + " },\n"
+ + " \"attempts\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 4\n"
+ + " },\n"
+ + " \"added-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 1\n"
+ + " },\n"
+ + " \"removed-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 2\n"
+ + " },\n"
+ + " \"total-data-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 3\n"
+ + " },\n"
+ + " \"added-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 4\n"
+ + " },\n"
+ + " \"added-equality-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 5\n"
+ + " },\n"
+ + " \"added-positional-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 6\n"
+ + " },\n"
+ + " \"removed-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 9\n"
+ + " },\n"
+ + " \"removed-positional-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 7\n"
+ + " },\n"
+ + " \"removed-equality-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 8\n"
+ + " },\n"
+ + " \"total-delete-files\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 10\n"
+ + " },\n"
+ + " \"added-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 11\n"
+ + " },\n"
+ + " \"removed-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 12\n"
+ + " },\n"
+ + " \"total-records\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 13\n"
+ + " },\n"
+ + " \"added-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 14\n"
+ + " },\n"
+ + " \"removed-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 15\n"
+ + " },\n"
+ + " \"total-files-size-bytes\" : {\n"
+ + " \"unit\" : \"bytes\",\n"
+ + " \"value\" : 16\n"
+ + " },\n"
+ + " \"added-position-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 17\n"
+ + " },\n"
+ + " \"removed-positional-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 19\n"
+ + " },\n"
+ + " \"total-positional-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 21\n"
+ + " },\n"
+ + " \"added-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 18\n"
+ + " },\n"
+ + " \"removed-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 20\n"
+ + " },\n"
+ + " \"total-equality-deletes\" : {\n"
+ + " \"unit\" : \"count\",\n"
+ + " \"value\" : 22\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
+ String json = CommitReportParser.toJson(commitReport, true);
+
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+ Assertions.assertThat(json).isEqualTo(expectedJson);
+ }
+
+ @Test
+ public void roundTripSerdeWithNoopMetrics() {
+ String tableName = "roundTripTableName";
+ CommitReport commitReport =
+ ImmutableCommitReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .operation("DELETE")
+ .sequenceNumber(4L)
+ .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(),
ImmutableMap.of()))
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"table-name\" : \"roundTripTableName\",\n"
+ + " \"snapshot-id\" : 23,\n"
+ + " \"sequence-number\" : 4,\n"
+ + " \"operation\" : \"DELETE\",\n"
+ + " \"metrics\" : { }\n"
+ + "}";
+
+ String json = CommitReportParser.toJson(commitReport, true);
+
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+ Assertions.assertThat(json).isEqualTo(expectedJson);
+ }
+
+ @Test
+ public void roundTripSerdeWithMetadata() {
+ String tableName = "roundTripTableName";
+ CommitReport commitReport =
+ ImmutableCommitReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .operation("DELETE")
+ .sequenceNumber(4L)
+ .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(),
ImmutableMap.of()))
+ .metadata(ImmutableMap.of("k1", "v1", "k2", "v2"))
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"table-name\" : \"roundTripTableName\",\n"
+ + " \"snapshot-id\" : 23,\n"
+ + " \"sequence-number\" : 4,\n"
+ + " \"operation\" : \"DELETE\",\n"
+ + " \"metrics\" : { },\n"
+ + " \"metadata\" : {\n"
+ + " \"k1\" : \"v1\",\n"
+ + " \"k2\" : \"v2\"\n"
+ + " }\n"
+ + "}";
+
+ String json = CommitReportParser.toJson(commitReport, true);
+
Assertions.assertThat(CommitReportParser.fromJson(json)).isEqualTo(commitReport);
+ Assertions.assertThat(json).isEqualTo(expectedJson);
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 2d1133e9c4..9f8306b35d 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -1121,7 +1121,8 @@ public class TestRESTCatalog extends
CatalogTests<RESTCatalog> {
}
// counter of custom metrics reporter should have been increased
- assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1);
+ // 1x for commit metrics / 1x for scan metrics
+ assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2);
}
public static class CustomMetricsReporter implements MetricsReporter {
diff --git
a/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
b/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
index a97bdfe3bc..70a219e791 100644
---
a/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
+++
b/core/src/test/java/org/apache/iceberg/rest/requests/TestReportMetricsRequestParser.java
@@ -20,11 +20,16 @@ package org.apache.iceberg.rest.requests;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.CommitMetrics;
+import org.apache.iceberg.metrics.CommitMetricsResult;
+import org.apache.iceberg.metrics.CommitReport;
+import org.apache.iceberg.metrics.ImmutableCommitReport;
import org.apache.iceberg.metrics.ImmutableScanReport;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -78,7 +83,7 @@ public class TestReportMetricsRequestParser {
}
@Test
- public void roundTripSerde() {
+ public void roundTripSerdeWithScanReport() {
String tableName = "roundTripTableName";
ScanReport scanReport =
ImmutableScanReport.builder()
@@ -111,4 +116,35 @@ public class TestReportMetricsRequestParser {
Assertions.assertThat(ReportMetricsRequestParser.fromJson(json).report())
.isEqualTo(metricsRequest.report());
}
+
+ @Test
+ public void roundTripSerdeWithCommitReport() {
+ String tableName = "roundTripTableName";
+ CommitReport commitReport =
+ ImmutableCommitReport.builder()
+ .tableName(tableName)
+ .snapshotId(23L)
+ .sequenceNumber(4L)
+ .operation("DELETE")
+ .commitMetrics(CommitMetricsResult.from(CommitMetrics.noop(),
ImmutableMap.of()))
+ .build();
+
+ String expectedJson =
+ "{\n"
+ + " \"report-type\" : \"commit-report\",\n"
+ + " \"table-name\" : \"roundTripTableName\",\n"
+ + " \"snapshot-id\" : 23,\n"
+ + " \"sequence-number\" : 4,\n"
+ + " \"operation\" : \"DELETE\",\n"
+ + " \"metrics\" : { }\n"
+ + "}";
+
+ ReportMetricsRequest metricsRequest =
ReportMetricsRequest.of(commitReport);
+
+ String json = ReportMetricsRequestParser.toJson(metricsRequest, true);
+ Assertions.assertThat(json).isEqualTo(expectedJson);
+
+ Assertions.assertThat(ReportMetricsRequestParser.fromJson(json).report())
+ .isEqualTo(metricsRequest.report());
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java
b/core/src/test/java/org/apache/iceberg/util/TestTasks.java
new file mode 100644
index 0000000000..f337a3500d
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.stream.IntStream;
+import org.apache.iceberg.metrics.Counter;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestTasks {
+
+ @Test
+ public void attemptCounterIsIncreasedOnRetries() {
+ Counter counter = new DefaultMetricsContext().counter("counter");
+
+ final int retries = 10;
+
+ Tasks.foreach(IntStream.range(0, 10))
+ .countAttempts(counter)
+ .exponentialBackoff(0, 0, 5000, 0)
+ .retry(retries)
+ .onlyRetryOn(RuntimeException.class)
+ .run(
+ x -> {
+ // don't throw on the last retry
+ if (counter.value() <= retries) {
+ throw new RuntimeException();
+ }
+ });
+
+ Assertions.assertThat(counter.value()).isEqualTo(retries + 1);
+ }
+
+ @Test
+ public void attemptCounterIsIncreasedWithoutRetries() {
+ Counter counter = new DefaultMetricsContext().counter("counter");
+
+ Tasks.foreach(IntStream.range(0, 10)).countAttempts(counter).run(x -> {});
+
+ Assertions.assertThat(counter.value()).isEqualTo(1L);
+ }
+}
diff --git a/open-api/rest-catalog-open-api.yaml
b/open-api/rest-catalog-open-api.yaml
index 1d810f4a07..c8c3b09c21 100644
--- a/open-api/rest-catalog-open-api.yaml
+++ b/open-api/rest-catalog-open-api.yaml
@@ -1895,6 +1895,7 @@ components:
ReportMetricsRequest:
anyOf:
- $ref: '#/components/schemas/ScanReport'
+ - $ref: '#/components/schemas/CommitReport'
required:
- report-type
properties:
@@ -1931,6 +1932,36 @@ components:
type: string
metrics:
$ref: '#/components/schemas/Metrics'
+ metadata:
+ type: object
+ additionalProperties:
+ type: string
+
+ CommitReport:
+ type: object
+ required:
+ - table-name
+ - snapshot-id
+ - sequence-number
+ - operation
+ - metrics
+ properties:
+ table-name:
+ type: string
+ snapshot-id:
+ type: integer
+ format: int64
+ sequence-number:
+ type: integer
+ format: int64
+ operation:
+ type: string
+ metrics:
+ $ref: '#/components/schemas/Metrics'
+ metadata:
+ type: object
+ additionalProperties:
+ type: string
#############################