This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f536c84035 API, Core: Ability to add multiple metrics reporters to
scans (#6919)
f536c84035 is described below
commit f536c840350bd5628d7c514d2a4719404c9b8ed1
Author: Karuppayya <[email protected]>
AuthorDate: Wed Mar 29 12:17:48 2023 -0700
API, Core: Ability to add multiple metrics reporters to scans (#6919)
---
.../java/org/apache/iceberg/BatchScanAdapter.java | 6 +++
api/src/main/java/org/apache/iceberg/Scan.java | 10 +++++
.../src/main/java/org/apache/iceberg/BaseScan.java | 6 +++
.../main/java/org/apache/iceberg/SnapshotScan.java | 5 ++-
.../java/org/apache/iceberg/TableScanContext.java | 43 ++++++++++++----------
.../iceberg/TestScanPlanningAndReporting.java | 32 ++++++++++++++++
6 files changed, 82 insertions(+), 20 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index 10e064fd75..17f12b3312 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReporter;
/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
class BatchScanAdapter implements BatchScan {
@@ -140,4 +141,9 @@ class BatchScanAdapter implements BatchScan {
public long splitOpenFileCost() {
return scan.splitOpenFileCost();
}
+
+ @Override
+ public BatchScan metricsReporter(MetricsReporter reporter) {
+ return new BatchScanAdapter(scan.metricsReporter(reporter));
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java
b/api/src/main/java/org/apache/iceberg/Scan.java
index 035f22947c..deebb66f2a 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/**
@@ -171,4 +172,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
/** Returns the split open file cost for this scan. */
long splitOpenFileCost();
+
+ /**
+ * Create a new scan that will report scan metrics to the provided reporter
in addition to
+ * reporters maintained by the scan.
+ */
+ default ThisT metricsReporter(MetricsReporter reporter) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement metricsReporter");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 94621a66b8..a1c92c927a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -250,4 +251,9 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
return schema;
}
+
+ @Override
+ public ThisT metricsReporter(MetricsReporter reporter) {
+ return newRefinedScan(table(), schema(), context().reportWith(reporter));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index b6520c2ff4..49769d9df9 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.DefaultMetricsContext;
import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.metrics.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReport;
@@ -144,7 +145,9 @@ public abstract class SnapshotScan<ThisT, T extends
ScanTask, G extends ScanTask
.scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
.metadata(metadata)
.build();
- context().metricsReporter().report(scanReport);
+ for (MetricsReporter metricsReporter : context().metricsReporters())
{
+ metricsReporter.report(scanReport);
+ }
});
}
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 6a3c7cc6e9..bf90bcefeb 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -27,7 +27,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
/** Context object with optional arguments for a TableScan. */
@@ -44,7 +46,7 @@ final class TableScanContext {
private final Long toSnapshotId;
private final ExecutorService planExecutor;
private final boolean fromSnapshotInclusive;
- private final MetricsReporter metricsReporter;
+ private final Collection<MetricsReporter> metricsReporters;
TableScanContext() {
this.snapshotId = null;
@@ -59,7 +61,7 @@ final class TableScanContext {
this.toSnapshotId = null;
this.planExecutor = null;
this.fromSnapshotInclusive = false;
- this.metricsReporter = LoggingMetricsReporter.instance();
+ this.metricsReporters =
Lists.newArrayList(LoggingMetricsReporter.instance());
}
private TableScanContext(
@@ -75,7 +77,7 @@ final class TableScanContext {
Long toSnapshotId,
ExecutorService planExecutor,
boolean fromSnapshotInclusive,
- MetricsReporter metricsReporter) {
+ Collection<MetricsReporter> metricsReporters) {
this.snapshotId = snapshotId;
this.rowFilter = rowFilter;
this.ignoreResiduals = ignoreResiduals;
@@ -88,7 +90,7 @@ final class TableScanContext {
this.toSnapshotId = toSnapshotId;
this.planExecutor = planExecutor;
this.fromSnapshotInclusive = fromSnapshotInclusive;
- this.metricsReporter = metricsReporter;
+ this.metricsReporters = metricsReporters;
}
Long snapshotId() {
@@ -109,7 +111,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
Expression rowFilter() {
@@ -130,7 +132,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
boolean ignoreResiduals() {
@@ -151,7 +153,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
boolean caseSensitive() {
@@ -172,7 +174,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
boolean returnColumnStats() {
@@ -193,7 +195,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
Collection<String> selectedColumns() {
@@ -216,7 +218,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
Schema projectedSchema() {
@@ -239,7 +241,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
Map<String, String> options() {
@@ -263,7 +265,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
Long fromSnapshotId() {
@@ -284,7 +286,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
false,
- metricsReporter);
+ metricsReporters);
}
TableScanContext fromSnapshotIdInclusive(long id) {
@@ -301,7 +303,7 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
true,
- metricsReporter);
+ metricsReporters);
}
boolean fromSnapshotInclusive() {
@@ -326,7 +328,7 @@ final class TableScanContext {
id,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
ExecutorService planExecutor() {
@@ -351,14 +353,17 @@ final class TableScanContext {
toSnapshotId,
executor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporters);
}
- MetricsReporter metricsReporter() {
- return metricsReporter;
+ Collection<MetricsReporter> metricsReporters() {
+ return metricsReporters;
}
TableScanContext reportWith(MetricsReporter reporter) {
+ ImmutableList.Builder<MetricsReporter> builder = ImmutableList.builder();
+ builder.addAll(metricsReporters);
+ builder.add(reporter);
return new TableScanContext(
snapshotId,
rowFilter,
@@ -372,6 +377,6 @@ final class TableScanContext {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- reporter);
+ builder.build());
}
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index 6284e08d45..83e9353a59 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.CommitReport;
@@ -42,6 +43,37 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
super(2);
}
+ @Test
+ public void scanningWithMultipleReporters() throws IOException {
+ String tableName = "scan-with-multiple-reporters";
+ Table table =
+ TestTables.create(
+ tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(),
formatVersion, reporter);
+ table.newAppend().appendFile(FILE_A).commit();
+ table.refresh();
+
+ AtomicInteger reportedCount = new AtomicInteger();
+ TableScan tableScan =
+ table
+ .newScan()
+ .metricsReporter((MetricsReporter) ->
reportedCount.getAndIncrement())
+ .metricsReporter((MetricsReporter) ->
reportedCount.getAndIncrement());
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ fileScanTasks.forEach(task -> {});
+ }
+
+ // verify if metrics are reported to default reporter
+ ScanReport scanReport = reporter.lastReport();
+ assertThat(scanReport).isNotNull();
+ assertThat(scanReport.tableName()).isEqualTo(tableName);
+ assertThat(scanReport.snapshotId()).isEqualTo(1L);
+ ScanMetricsResult result = scanReport.scanMetrics();
+
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
+ assertThat(result.resultDataFiles().value()).isEqualTo(1);
+
+ assertThat(reportedCount.get()).isEqualTo(2);
+ }
+
@Test
public void scanningWithMultipleDataManifests() throws IOException {
String tableName = "multiple-data-manifests";