This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c0555c89b9 Spark 3.3: Optimize DELETEs handled using metadata (#6899)
c0555c89b9 is described below
commit c0555c89b9191c895c1f46143602e58e77a7ddbc
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 22 14:27:42 2023 -0800
Spark 3.3: Optimize DELETEs handled using metadata (#6899)
---
.../apache/iceberg/expressions/ExpressionUtil.java | 17 ++++++++
.../iceberg/spark/extensions/TestDelete.java | 50 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkReadConf.java | 2 +-
.../java/org/apache/iceberg/spark/SparkUtil.java | 4 ++
.../iceberg/spark/actions/SparkZOrderStrategy.java | 3 +-
.../iceberg/spark/source/SparkScanBuilder.java | 7 +--
.../apache/iceberg/spark/source/SparkTable.java | 12 ++++--
7 files changed, 84 insertions(+), 11 deletions(-)
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index 3512f274e2..c910a77640 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -30,6 +30,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.transforms.Transforms;
@@ -120,6 +121,22 @@ public class ExpressionUtil {
.isEquivalentTo(Binder.bind(struct, Expressions.rewriteNot(right),
caseSensitive));
}
+ /**
+ * Returns whether an expression selects whole partitions for all partition
specs in a table.
+ *
+ * <p>For example, ts < '2021-03-09T10:00:00.000' selects whole
partitions in an hourly spec,
+ * [hours(ts)], but does not select whole partitions in a daily spec,
[days(ts)].
+ *
+ * @param expr an unbound expression
+ * @param table a table
+ * @param caseSensitive whether expression binding should be case sensitive
+ * @return true if the expression will select whole partitions in all table
specs
+ */
+ public static boolean selectsPartitions(Expression expr, Table table,
boolean caseSensitive) {
+ return table.specs().values().stream()
+ .allMatch(spec -> selectsPartitions(expr, spec, caseSensitive));
+ }
+
/**
* Returns whether an expression selects whole partitions for a partition
spec.
*
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fb0d8fdab2..0b73821c61 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -39,8 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -59,6 +61,10 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import
org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromIcebergTable;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
@@ -91,6 +97,42 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
sql("DROP TABLE IF EXISTS deleted_dep");
}
+ @Test
+ public void testDeleteWithoutScanningTable() throws Exception {
+ createAndInitPartitionedTable();
+
+ append(new Employee(1, "hr"), new Employee(3, "hr"));
+ append(new Employee(1, "hardware"), new Employee(2, "hardware"));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ List<String> manifestLocations =
+ table.currentSnapshot().allManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .collect(Collectors.toList());
+
+ withUnavailableLocations(
+ manifestLocations,
+ () -> {
+ LogicalPlan parsed = parsePlan("DELETE FROM %s WHERE dep = 'hr'",
tableName);
+
+ DeleteFromIcebergTable analyzed =
+ (DeleteFromIcebergTable)
spark.sessionState().analyzer().execute(parsed);
+ Assert.assertTrue("Should have rewrite plan",
analyzed.rewritePlan().isDefined());
+
+ DeleteFromIcebergTable optimized =
+ (DeleteFromIcebergTable)
OptimizeMetadataOnlyDeleteFromIcebergTable.apply(analyzed);
+ Assert.assertTrue("Should discard rewrite plan",
optimized.rewritePlan().isEmpty());
+ });
+
+ sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1, "hardware"), row(2, "hardware")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
@Test
public void testDeleteFileThenMetadataDelete() throws Exception {
Assume.assumeFalse("Avro does not support metadata delete",
fileFormat.equals("avro"));
@@ -991,4 +1033,12 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
String modeName = table.properties().getOrDefault(DELETE_MODE,
DELETE_MODE_DEFAULT);
return RowLevelOperationMode.fromName(modeName);
}
+
+ private LogicalPlan parsePlan(String query, Object... args) {
+ try {
+ return spark.sessionState().sqlParser().parsePlan(String.format(query,
args));
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index a6bdb01693..a2dcfed969 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -58,7 +58,7 @@ public class SparkReadConf {
}
public boolean caseSensitive() {
- return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ return SparkUtil.caseSensitive(spark);
}
public boolean localityEnabled() {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index c461912186..4ecf458efd 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -292,4 +292,8 @@ public class SparkUtil {
public static String toColumnName(NamedReference ref) {
return DOT.join(ref.fieldNames());
}
+
+ public static boolean caseSensitive(SparkSession spark) {
+ return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index ed09e974b9..b936fb9217 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -37,6 +37,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
@@ -154,7 +155,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
}
private void validateColumnsExistence(Table table, SparkSession spark,
List<String> colNames) {
- boolean caseSensitive =
Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ boolean caseSensitive = SparkUtil.caseSensitive(spark);
Schema schema = table.schema();
colNames.forEach(
col -> {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 90c8ae3f92..dcd89b15a2 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -130,7 +130,7 @@ public class SparkScanBuilder
pushableFilters.add(filter);
}
- if (expr == null || requiresSparkFiltering(expr)) {
+ if (expr == null || !ExpressionUtil.selectsPartitions(expr, table,
caseSensitive)) {
postScanFilters.add(filter);
} else {
LOG.info("Evaluating completely on Iceberg side: {}", filter);
@@ -148,11 +148,6 @@ public class SparkScanBuilder
return postScanFilters.toArray(new Filter[0]);
}
- private boolean requiresSparkFiltering(Expression expr) {
- return table.specs().values().stream()
- .anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec,
caseSensitive));
- }
-
@Override
public Filter[] pushedFilters() {
return pushedFilters;
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 0208121f00..725c7e1e70 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
@@ -50,6 +51,7 @@ import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
@@ -274,13 +276,17 @@ public class SparkTable
}
}
- return deleteExpr == Expressions.alwaysTrue() ||
canDeleteUsingMetadata(deleteExpr);
+ return canDeleteUsingMetadata(deleteExpr);
}
// a metadata delete is possible iff matching files can be deleted entirely
private boolean canDeleteUsingMetadata(Expression deleteExpr) {
- boolean caseSensitive =
-
Boolean.parseBoolean(sparkSession().conf().get("spark.sql.caseSensitive"));
+ boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
+
+ if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
+ return true;
+ }
+
TableScan scan =
table()
.newScan()