This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1354fb299f Spark 3.5: Fix metrics reporting in distributed planning
(#8602)
1354fb299f is described below
commit 1354fb299f327234e3fd46072d660e190234b76f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Sep 21 08:57:29 2023 -0700
Spark 3.5: Fix metrics reporting in distributed planning (#8602)
---
.../main/java/org/apache/iceberg/BaseTable.java | 4 +
....java => ScanPlanningAndReportingTestBase.java} | 39 +++++-----
.../org/apache/iceberg/TestCommitReporting.java | 2 +-
.../iceberg/TestLocalScanPlanningAndReporting.java | 28 +++++++
.../apache/iceberg/SparkDistributedDataScan.java | 27 ++++++-
.../TestSparkDistributedDataScanReporting.java | 85 ++++++++++++++++++++++
6 files changed, 165 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index b9ed4f8d67..2093753bf7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -53,6 +53,10 @@ public class BaseTable implements Table, HasTableOperations,
Serializable {
this.reporter = reporter;
}
+ MetricsReporter reporter() {
+ return reporter;
+ }
+
@Override
public TableOperations operations() {
return ops;
diff --git
a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
similarity index 93%
rename from
core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
rename to
core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
index 106c236f59..a8f98f82cc 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
@@ -36,14 +36,18 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Test;
-public class TestScanPlanningAndReporting extends TableTestBase {
+public abstract class ScanPlanningAndReportingTestBase<
+ ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends
ScanTaskGroup<T>>
+ extends TableTestBase {
private final TestMetricsReporter reporter = new TestMetricsReporter();
- public TestScanPlanningAndReporting() {
+ public ScanPlanningAndReportingTestBase() {
super(2);
}
+ protected abstract ScanT newScan(Table table);
+
@Test
public void noDuplicatesInScanContext() {
TableScanContext context = TableScanContext.empty();
@@ -82,12 +86,11 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.refresh();
AtomicInteger reportedCount = new AtomicInteger();
- TableScan tableScan =
- table
- .newScan()
+ ScanT tableScan =
+ newScan(table)
.metricsReporter((MetricsReporter) ->
reportedCount.getAndIncrement())
.metricsReporter((MetricsReporter) ->
reportedCount.getAndIncrement());
- try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
@@ -113,10 +116,10 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
table.newAppend().appendFile(FILE_D).commit();
table.refresh();
- TableScan tableScan = table.newScan();
+ ScanT tableScan = newScan(table);
// should be 3 files
- try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
@@ -180,9 +183,9 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit();
- TableScan tableScan = table.newScan();
+ ScanT tableScan = newScan(table);
- try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
fileScanTasks.forEach(task -> {});
}
@@ -218,12 +221,12 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newAppend().appendFile(FILE_C).commit();
- TableScan tableScan = table.newScan();
+ ScanT tableScan = newScan(table);
List<FileScanTask> fileTasks = Lists.newArrayList();
- try (CloseableIterable<FileScanTask> fileScanTasks =
+ try (CloseableIterable<T> scanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
- fileScanTasks.forEach(fileTasks::add);
+ scanTasks.forEach(task -> fileTasks.add((FileScanTask) task));
}
assertThat(fileTasks)
.singleElement()
@@ -259,12 +262,12 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
- TableScan tableScan = table.newScan();
+ ScanT tableScan = newScan(table);
List<FileScanTask> fileTasks = Lists.newArrayList();
- try (CloseableIterable<FileScanTask> fileScanTasks =
+ try (CloseableIterable<T> scanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
- fileScanTasks.forEach(fileTasks::add);
+ scanTasks.forEach(task -> fileTasks.add((FileScanTask) task));
}
assertThat(fileTasks)
.singleElement()
@@ -302,9 +305,9 @@ public class TestScanPlanningAndReporting extends
TableTestBase {
table.newAppend().appendFile(FILE_A).commit();
// FILE_A_DELETES = positionalDelete / FILE_A2_DELETES = equalityDelete
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
- TableScan tableScan = table.newScan();
+ ScanT tableScan = newScan(table);
- try (CloseableIterable<FileScanTask> fileScanTasks =
+ try (CloseableIterable<T> fileScanTasks =
tableScan.filter(Expressions.equal("data", "6")).planFiles()) {
fileScanTasks.forEach(task -> {});
}
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
index 9998c47ff3..08c4ac33d6 100644
--- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -21,7 +21,7 @@ package org.apache.iceberg;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
-import org.apache.iceberg.TestScanPlanningAndReporting.TestMetricsReporter;
+import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter;
import org.apache.iceberg.metrics.CommitMetricsResult;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
diff --git
a/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java
b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java
new file mode 100644
index 0000000000..dd8f5374f0
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/TestLocalScanPlanningAndReporting.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+public class TestLocalScanPlanningAndReporting
+ extends ScanPlanningAndReportingTestBase<TableScan, FileScanTask,
CombinedScanTask> {
+
+ @Override
+ protected TableScan newScan(Table table) {
+ return table.newScan();
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
index d4c2848b45..43ce2a303e 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -81,7 +82,7 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
private Broadcast<Table> tableBroadcast = null;
public SparkDistributedDataScan(SparkSession spark, Table table,
SparkReadConf readConf) {
- this(spark, table, readConf, table.schema(), TableScanContext.empty());
+ this(spark, table, readConf, table.schema(), newTableScanContext(table));
}
private SparkDistributedDataScan(
@@ -134,6 +135,10 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
.flatMap(new ReadDataManifest(tableBroadcast(), context(),
withColumnStats));
List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
+ int matchingFilesCount =
dataFileGroups.stream().mapToInt(List::size).sum();
+ int skippedFilesCount = liveFilesCount(dataManifests) - matchingFilesCount;
+ scanMetrics().skippedDataFiles().increment(skippedFilesCount);
+
return Iterables.transform(dataFileGroups,
CloseableIterable::withNoopClose);
}
@@ -157,6 +162,9 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
.flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
.collect();
+ int skippedFilesCount = liveFilesCount(deleteManifests) -
deleteFiles.size();
+ scanMetrics().skippedDeleteFiles().increment(skippedFilesCount);
+
return DeleteFileIndex.builderFor(deleteFiles)
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
@@ -193,6 +201,23 @@ public class SparkDistributedDataScan extends
BaseDistributedDataScan {
return Arrays.asList(rdd.collectPartitions(partitionIds));
}
+ private int liveFilesCount(List<ManifestFile> manifests) {
+ return manifests.stream().mapToInt(this::liveFilesCount).sum();
+ }
+
+ private int liveFilesCount(ManifestFile manifest) {
+ return manifest.existingFilesCount() + manifest.addedFilesCount();
+ }
+
+ private static TableScanContext newTableScanContext(Table table) {
+ if (table instanceof BaseTable) {
+ MetricsReporter reporter = ((BaseTable) table).reporter();
+ return
ImmutableTableScanContext.builder().metricsReporter(reporter).build();
+ } else {
+ return TableScanContext.empty();
+ }
+ }
+
private static class ReadDataManifest implements
FlatMapFunction<ManifestFileBean, DataFile> {
private final Broadcast<Table> table;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
new file mode 100644
index 0000000000..1ea4f990b2
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanReporting
+ extends ScanPlanningAndReportingTestBase<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
+
+ @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[] {LOCAL, LOCAL},
+ new Object[] {LOCAL, DISTRIBUTED},
+ new Object[] {DISTRIBUTED, LOCAL},
+ new Object[] {DISTRIBUTED, DISTRIBUTED}
+ };
+ }
+
+ private static SparkSession spark = null;
+
+ private final PlanningMode dataMode;
+ private final PlanningMode deleteMode;
+
+ public TestSparkDistributedDataScanReporting(
+ PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+ this.dataMode = dataPlanningMode;
+ this.deleteMode = deletePlanningMode;
+ }
+
+ @BeforeClass
+ public static void startSpark() {
+ TestSparkDistributedDataScanReporting.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestSparkDistributedDataScanReporting.spark;
+ TestSparkDistributedDataScanReporting.spark = null;
+ currentSpark.stop();
+ }
+
+ @Override
+ protected BatchScan newScan(Table table) {
+ table
+ .updateProperties()
+ .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+ .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+ .commit();
+ SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ return new SparkDistributedDataScan(spark, table, readConf);
+ }
+}