This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1354fb299f Spark 3.5: Fix metrics reporting in distributed planning 
(#8602)
1354fb299f is described below

commit 1354fb299f327234e3fd46072d660e190234b76f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 21 08:57:29 2023 -0700

    Spark 3.5: Fix metrics reporting in distributed planning (#8602)
---
 .../main/java/org/apache/iceberg/BaseTable.java    |  4 +
 ....java => ScanPlanningAndReportingTestBase.java} | 39 +++++-----
 .../org/apache/iceberg/TestCommitReporting.java    |  2 +-
 .../iceberg/TestLocalScanPlanningAndReporting.java | 28 +++++++
 .../apache/iceberg/SparkDistributedDataScan.java   | 27 ++++++-
 .../TestSparkDistributedDataScanReporting.java     | 85 ++++++++++++++++++++++
 6 files changed, 165 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java 
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index b9ed4f8d67..2093753bf7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -53,6 +53,10 @@ public class BaseTable implements Table, HasTableOperations, 
Serializable {
     this.reporter = reporter;
   }
 
+  MetricsReporter reporter() {
+    return reporter;
+  }
+
   @Override
   public TableOperations operations() {
     return ops;
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java 
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
similarity index 93%
rename from 
core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
rename to 
core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
index 106c236f59..a8f98f82cc 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ 
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
@@ -36,14 +36,18 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.assertj.core.api.InstanceOfAssertFactories;
 import org.junit.Test;
 
-public class TestScanPlanningAndReporting extends TableTestBase {
+public abstract class ScanPlanningAndReportingTestBase<
+        ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends 
ScanTaskGroup<T>>
+    extends TableTestBase {
 
   private final TestMetricsReporter reporter = new TestMetricsReporter();
 
-  public TestScanPlanningAndReporting() {
+  public ScanPlanningAndReportingTestBase() {
     super(2);
   }
 
+  protected abstract ScanT newScan(Table table);
+
   @Test
   public void noDuplicatesInScanContext() {
     TableScanContext context = TableScanContext.empty();
@@ -82,12 +86,11 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     table.refresh();
 
     AtomicInteger reportedCount = new AtomicInteger();
-    TableScan tableScan =
-        table
-            .newScan()
+    ScanT tableScan =
+        newScan(table)
             .metricsReporter((MetricsReporter) -> 
reportedCount.getAndIncrement())
             .metricsReporter((MetricsReporter) -> 
reportedCount.getAndIncrement());
-    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+    try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
       fileScanTasks.forEach(task -> {});
     }
 
@@ -113,10 +116,10 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
     table.newAppend().appendFile(FILE_D).commit();
     table.refresh();
-    TableScan tableScan = table.newScan();
+    ScanT tableScan = newScan(table);
 
     // should be 3 files
-    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+    try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
       fileScanTasks.forEach(task -> {});
     }
 
@@ -180,9 +183,9 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
 
     
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
     
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit();
-    TableScan tableScan = table.newScan();
+    ScanT tableScan = newScan(table);
 
-    try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+    try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
       fileScanTasks.forEach(task -> {});
     }
 
@@ -218,12 +221,12 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
     table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
     table.newAppend().appendFile(FILE_C).commit();
-    TableScan tableScan = table.newScan();
+    ScanT tableScan = newScan(table);
 
     List<FileScanTask> fileTasks = Lists.newArrayList();
-    try (CloseableIterable<FileScanTask> fileScanTasks =
+    try (CloseableIterable<T> scanTasks =
         tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
-      fileScanTasks.forEach(fileTasks::add);
+      scanTasks.forEach(task -> fileTasks.add((FileScanTask) task));
     }
     assertThat(fileTasks)
         .singleElement()
@@ -259,12 +262,12 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
     
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
     
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
-    TableScan tableScan = table.newScan();
+    ScanT tableScan = newScan(table);
 
     List<FileScanTask> fileTasks = Lists.newArrayList();
-    try (CloseableIterable<FileScanTask> fileScanTasks =
+    try (CloseableIterable<T> scanTasks =
         tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
-      fileScanTasks.forEach(fileTasks::add);
+      scanTasks.forEach(task -> fileTasks.add((FileScanTask) task));
     }
     assertThat(fileTasks)
         .singleElement()
@@ -302,9 +305,9 @@ public class TestScanPlanningAndReporting extends 
TableTestBase {
     table.newAppend().appendFile(FILE_A).commit();
     // FILE_A_DELETES = positionalDelete / FILE_A2_DELETES = equalityDelete
     
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
-    TableScan tableScan = table.newScan();
+    ScanT tableScan = newScan(table);
 
-    try (CloseableIterable<FileScanTask> fileScanTasks =
+    try (CloseableIterable<T> fileScanTasks =
         tableScan.filter(Expressions.equal("data", "6")).planFiles()) {
       fileScanTasks.forEach(task -> {});
     }
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java 
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
index 9998c47ff3..08c4ac33d6 100644
--- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -21,7 +21,7 @@ package org.apache.iceberg;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
-import org.apache.iceberg.TestScanPlanningAndReporting.TestMetricsReporter;
+import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter;
 import org.apache.iceberg.metrics.CommitMetricsResult;
 import org.apache.iceberg.metrics.CommitReport;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
diff --git 
a/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java 
b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java
new file mode 100644
index 0000000000..dd8f5374f0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+public class TestLocalScanPlanningAndReporting
+    extends ScanPlanningAndReportingTestBase<TableScan, FileScanTask, 
CombinedScanTask> {
+
+  @Override
+  protected TableScan newScan(Table table) {
+    return table.newScan();
+  }
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
index d4c2848b45..43ce2a303e 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
   private Broadcast<Table> tableBroadcast = null;
 
   public SparkDistributedDataScan(SparkSession spark, Table table, 
SparkReadConf readConf) {
-    this(spark, table, readConf, table.schema(), TableScanContext.empty());
+    this(spark, table, readConf, table.schema(), newTableScanContext(table));
   }
 
   private SparkDistributedDataScan(
@@ -134,6 +135,10 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
             .flatMap(new ReadDataManifest(tableBroadcast(), context(), 
withColumnStats));
     List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
 
+    int matchingFilesCount = 
dataFileGroups.stream().mapToInt(List::size).sum();
+    int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
+    scanMetrics().skippedDataFiles().increment(skippedFilesCount);
+
     return Iterables.transform(dataFileGroups, 
CloseableIterable::withNoopClose);
   }
 
@@ -157,6 +162,9 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
             .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
             .collect();
 
+    int skippedFilesCount = liveFilesCount(deleteManifests) - 
deleteFiles.size();
+    scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
+
     return DeleteFileIndex.builderFor(deleteFiles)
         .specsById(table().specs())
         .caseSensitive(isCaseSensitive())
@@ -193,6 +201,23 @@ public class SparkDistributedDataScan extends 
BaseDistributedDataScan {
     return Arrays.asList(rdd.collectPartitions(partitionIds));
   }
 
+  private int liveFilesCount(List<ManifestFile> manifests) {
+    return manifests.stream().mapToInt(this::liveFilesCount).sum();
+  }
+
+  private int liveFilesCount(ManifestFile manifest) {
+    return manifest.existingFilesCount() + manifest.addedFilesCount();
+  }
+
+  private static TableScanContext newTableScanContext(Table table) {
+    if (table instanceof BaseTable) {
+      MetricsReporter reporter = ((BaseTable) table).reporter();
+      return 
ImmutableTableScanContext.builder().metricsReporter(reporter).build();
+    } else {
+      return TableScanContext.empty();
+    }
+  }
+
   private static class ReadDataManifest implements 
FlatMapFunction<ManifestFileBean, DataFile> {
 
     private final Broadcast<Table> table;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
new file mode 100644
index 0000000000..1ea4f990b2
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanReporting
+    extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
+
+  @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      new Object[] {LOCAL, LOCAL},
+      new Object[] {LOCAL, DISTRIBUTED},
+      new Object[] {DISTRIBUTED, LOCAL},
+      new Object[] {DISTRIBUTED, DISTRIBUTED}
+    };
+  }
+
+  private static SparkSession spark = null;
+
+  private final PlanningMode dataMode;
+  private final PlanningMode deleteMode;
+
+  public TestSparkDistributedDataScanReporting(
+      PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+    this.dataMode = dataPlanningMode;
+    this.deleteMode = deletePlanningMode;
+  }
+
+  @BeforeClass
+  public static void startSpark() {
+    TestSparkDistributedDataScanReporting.spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
+    TestSparkDistributedDataScanReporting.spark = null;
+    currentSpark.stop();
+  }
+
+  @Override
+  protected BatchScan newScan(Table table) {
+    table
+        .updateProperties()
+        .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+        .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+        .commit();
+    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    return new SparkDistributedDataScan(spark, table, readConf);
+  }
+}

Reply via email to