This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f536c84035 API, Core: Ability to add multiple metrics reporters to 
scans (#6919)
f536c84035 is described below

commit f536c840350bd5628d7c514d2a4719404c9b8ed1
Author: Karuppayya <[email protected]>
AuthorDate: Wed Mar 29 12:17:48 2023 -0700

    API, Core: Ability to add multiple metrics reporters to scans (#6919)
---
 .../java/org/apache/iceberg/BatchScanAdapter.java  |  6 +++
 api/src/main/java/org/apache/iceberg/Scan.java     | 10 +++++
 .../src/main/java/org/apache/iceberg/BaseScan.java |  6 +++
 .../main/java/org/apache/iceberg/SnapshotScan.java |  5 ++-
 .../java/org/apache/iceberg/TableScanContext.java  | 43 ++++++++++++----------
 .../iceberg/TestScanPlanningAndReporting.java      | 32 ++++++++++++++++
 6 files changed, 82 insertions(+), 20 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java 
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index 10e064fd75..17f12b3312 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReporter;
 
 /** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
 class BatchScanAdapter implements BatchScan {
@@ -140,4 +141,9 @@ class BatchScanAdapter implements BatchScan {
   public long splitOpenFileCost() {
     return scan.splitOpenFileCost();
   }
+
+  @Override
+  public BatchScan metricsReporter(MetricsReporter reporter) {
+    return new BatchScanAdapter(scan.metricsReporter(reporter));
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java 
b/api/src/main/java/org/apache/iceberg/Scan.java
index 035f22947c..deebb66f2a 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
@@ -171,4 +172,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
 
   /** Returns the split open file cost for this scan. */
   long splitOpenFileCost();
+
+  /**
+   * Create a new scan that will report scan metrics to the provided reporter 
in addition to
+   * reporters maintained by the scan.
+   */
+  default ThisT metricsReporter(MetricsReporter reporter) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement metricsReporter");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 94621a66b8..a1c92c927a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -250,4 +251,9 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
 
     return schema;
   }
+
+  @Override
+  public ThisT metricsReporter(MetricsReporter reporter) {
+    return newRefinedScan(table(), schema(), context().reportWith(reporter));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java 
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index b6520c2ff4..49769d9df9 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -27,6 +27,7 @@ import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.metrics.DefaultMetricsContext;
 import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.metrics.ScanMetrics;
 import org.apache.iceberg.metrics.ScanMetricsResult;
 import org.apache.iceberg.metrics.ScanReport;
@@ -144,7 +145,9 @@ public abstract class SnapshotScan<ThisT, T extends 
ScanTask, G extends ScanTask
                   
.scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
                   .metadata(metadata)
                   .build();
-          context().metricsReporter().report(scanReport);
+          for (MetricsReporter metricsReporter : context().metricsReporters()) 
{
+            metricsReporter.report(scanReport);
+          }
         });
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 6a3c7cc6e9..bf90bcefeb 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -27,7 +27,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.ThreadPools;
 
 /** Context object with optional arguments for a TableScan. */
@@ -44,7 +46,7 @@ final class TableScanContext {
   private final Long toSnapshotId;
   private final ExecutorService planExecutor;
   private final boolean fromSnapshotInclusive;
-  private final MetricsReporter metricsReporter;
+  private final Collection<MetricsReporter> metricsReporters;
 
   TableScanContext() {
     this.snapshotId = null;
@@ -59,7 +61,7 @@ final class TableScanContext {
     this.toSnapshotId = null;
     this.planExecutor = null;
     this.fromSnapshotInclusive = false;
-    this.metricsReporter = LoggingMetricsReporter.instance();
+    this.metricsReporters = 
Lists.newArrayList(LoggingMetricsReporter.instance());
   }
 
   private TableScanContext(
@@ -75,7 +77,7 @@ final class TableScanContext {
       Long toSnapshotId,
       ExecutorService planExecutor,
       boolean fromSnapshotInclusive,
-      MetricsReporter metricsReporter) {
+      Collection<MetricsReporter> metricsReporters) {
     this.snapshotId = snapshotId;
     this.rowFilter = rowFilter;
     this.ignoreResiduals = ignoreResiduals;
@@ -88,7 +90,7 @@ final class TableScanContext {
     this.toSnapshotId = toSnapshotId;
     this.planExecutor = planExecutor;
     this.fromSnapshotInclusive = fromSnapshotInclusive;
-    this.metricsReporter = metricsReporter;
+    this.metricsReporters = metricsReporters;
   }
 
   Long snapshotId() {
@@ -109,7 +111,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   Expression rowFilter() {
@@ -130,7 +132,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   boolean ignoreResiduals() {
@@ -151,7 +153,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   boolean caseSensitive() {
@@ -172,7 +174,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   boolean returnColumnStats() {
@@ -193,7 +195,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   Collection<String> selectedColumns() {
@@ -216,7 +218,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   Schema projectedSchema() {
@@ -239,7 +241,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   Map<String, String> options() {
@@ -263,7 +265,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   Long fromSnapshotId() {
@@ -284,7 +286,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         false,
-        metricsReporter);
+        metricsReporters);
   }
 
   TableScanContext fromSnapshotIdInclusive(long id) {
@@ -301,7 +303,7 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         true,
-        metricsReporter);
+        metricsReporters);
   }
 
   boolean fromSnapshotInclusive() {
@@ -326,7 +328,7 @@ final class TableScanContext {
         id,
         planExecutor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
   ExecutorService planExecutor() {
@@ -351,14 +353,17 @@ final class TableScanContext {
         toSnapshotId,
         executor,
         fromSnapshotInclusive,
-        metricsReporter);
+        metricsReporters);
   }
 
-  MetricsReporter metricsReporter() {
-    return metricsReporter;
+  Collection<MetricsReporter> metricsReporters() {
+    return metricsReporters;
   }
 
   TableScanContext reportWith(MetricsReporter reporter) {
+    ImmutableList.Builder<MetricsReporter> builder = ImmutableList.builder();
+    builder.addAll(metricsReporters);
+    builder.add(reporter);
     return new TableScanContext(
         snapshotId,
         rowFilter,
@@ -372,6 +377,6 @@ final class TableScanContext {
         toSnapshotId,
         planExecutor,
         fromSnapshotInclusive,
-        reporter);
+        builder.build());
   }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java 
b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index 6284e08d45..83e9353a59 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.metrics.CommitReport;
@@ -42,6 +43,37 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     super(2);
   }
 
+  @Test
+  public void scanningWithMultipleReporters() throws IOException {
+    String tableName = "scan-with-multiple-reporters";
+    Table table =
+        TestTables.create(
+            tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), 
formatVersion, reporter);
+    table.newAppend().appendFile(FILE_A).commit();
+    table.refresh();
+
+    AtomicInteger reportedCount = new AtomicInteger();
+    TableScan tableScan =
+        table
+            .newScan()
+            .metricsReporter((MetricsReporter) -> 
reportedCount.getAndIncrement())
+            .metricsReporter((MetricsReporter) -> 
reportedCount.getAndIncrement());
+    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+      fileScanTasks.forEach(task -> {});
+    }
+
+    // verify if metrics are reported to default reporter
+    ScanReport scanReport = reporter.lastReport();
+    assertThat(scanReport).isNotNull();
+    assertThat(scanReport.tableName()).isEqualTo(tableName);
+    assertThat(scanReport.snapshotId()).isEqualTo(1L);
+    ScanMetricsResult result = scanReport.scanMetrics();
+    
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
+    assertThat(result.resultDataFiles().value()).isEqualTo(1);
+
+    assertThat(reportedCount.get()).isEqualTo(2);
+  }
+
   @Test
   public void scanningWithMultipleDataManifests() throws IOException {
     String tableName = "multiple-data-manifests";

Reply via email to