This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c0555c89b9 Spark 3.3: Optimize DELETEs handled using metadata (#6899)
c0555c89b9 is described below

commit c0555c89b9191c895c1f46143602e58e77a7ddbc
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 22 14:27:42 2023 -0800

    Spark 3.3: Optimize DELETEs handled using metadata (#6899)
---
 .../apache/iceberg/expressions/ExpressionUtil.java | 17 ++++++++
 .../iceberg/spark/extensions/TestDelete.java       | 50 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkReadConf.java    |  2 +-
 .../java/org/apache/iceberg/spark/SparkUtil.java   |  4 ++
 .../iceberg/spark/actions/SparkZOrderStrategy.java |  3 +-
 .../iceberg/spark/source/SparkScanBuilder.java     |  7 +--
 .../apache/iceberg/spark/source/SparkTable.java    | 12 ++++--
 7 files changed, 84 insertions(+), 11 deletions(-)

diff --git 
a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java 
b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index 3512f274e2..c910a77640 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -30,6 +30,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.transforms.Transforms;
@@ -120,6 +121,22 @@ public class ExpressionUtil {
         .isEquivalentTo(Binder.bind(struct, Expressions.rewriteNot(right), 
caseSensitive));
   }
 
+  /**
+   * Returns whether an expression selects whole partitions for all partition 
specs in a table.
+   *
+   * <p>For example, ts &lt; '2021-03-09T10:00:00.000' selects whole 
partitions in an hourly spec,
+   * [hours(ts)], but does not select whole partitions in a daily spec, 
[days(ts)].
+   *
+   * @param expr an unbound expression
+   * @param table a table
+   * @param caseSensitive whether expression binding should be case sensitive
+   * @return true if the expression will select whole partitions in all table 
specs
+   */
+  public static boolean selectsPartitions(Expression expr, Table table, 
boolean caseSensitive) {
+    return table.specs().values().stream()
+        .allMatch(spec -> selectsPartitions(expr, spec, caseSensitive));
+  }
+
   /**
    * Returns whether an expression selects whole partitions for a partition 
spec.
    *
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fb0d8fdab2..0b73821c61 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -39,8 +39,10 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -59,6 +61,10 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import 
org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromIcebergTable;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
@@ -91,6 +97,42 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS deleted_dep");
   }
 
+  @Test
+  public void testDeleteWithoutScanningTable() throws Exception {
+    createAndInitPartitionedTable();
+
+    append(new Employee(1, "hr"), new Employee(3, "hr"));
+    append(new Employee(1, "hardware"), new Employee(2, "hardware"));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    List<String> manifestLocations =
+        table.currentSnapshot().allManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .collect(Collectors.toList());
+
+    withUnavailableLocations(
+        manifestLocations,
+        () -> {
+          LogicalPlan parsed = parsePlan("DELETE FROM %s WHERE dep = 'hr'", 
tableName);
+
+          DeleteFromIcebergTable analyzed =
+              (DeleteFromIcebergTable) 
spark.sessionState().analyzer().execute(parsed);
+          Assert.assertTrue("Should have rewrite plan", 
analyzed.rewritePlan().isDefined());
+
+          DeleteFromIcebergTable optimized =
+              (DeleteFromIcebergTable) 
OptimizeMetadataOnlyDeleteFromIcebergTable.apply(analyzed);
+          Assert.assertTrue("Should discard rewrite plan", 
optimized.rewritePlan().isEmpty());
+        });
+
+    sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1, "hardware"), row(2, "hardware")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
   @Test
   public void testDeleteFileThenMetadataDelete() throws Exception {
     Assume.assumeFalse("Avro does not support metadata delete", 
fileFormat.equals("avro"));
@@ -991,4 +1033,12 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     String modeName = table.properties().getOrDefault(DELETE_MODE, 
DELETE_MODE_DEFAULT);
     return RowLevelOperationMode.fromName(modeName);
   }
+
+  private LogicalPlan parsePlan(String query, Object... args) {
+    try {
+      return spark.sessionState().sqlParser().parsePlan(String.format(query, 
args));
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index a6bdb01693..a2dcfed969 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -58,7 +58,7 @@ public class SparkReadConf {
   }
 
   public boolean caseSensitive() {
-    return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+    return SparkUtil.caseSensitive(spark);
   }
 
   public boolean localityEnabled() {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
index c461912186..4ecf458efd 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -292,4 +292,8 @@ public class SparkUtil {
   public static String toColumnName(NamedReference ref) {
     return DOT.join(ref.fieldNames());
   }
+
+  public static boolean caseSensitive(SparkSession spark) {
+    return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index ed09e974b9..b936fb9217 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -37,6 +37,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
@@ -154,7 +155,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
   }
 
   private void validateColumnsExistence(Table table, SparkSession spark, 
List<String> colNames) {
-    boolean caseSensitive = 
Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+    boolean caseSensitive = SparkUtil.caseSensitive(spark);
     Schema schema = table.schema();
     colNames.forEach(
         col -> {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 90c8ae3f92..dcd89b15a2 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -130,7 +130,7 @@ public class SparkScanBuilder
           pushableFilters.add(filter);
         }
 
-        if (expr == null || requiresSparkFiltering(expr)) {
+        if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, 
caseSensitive)) {
           postScanFilters.add(filter);
         } else {
           LOG.info("Evaluating completely on Iceberg side: {}", filter);
@@ -148,11 +148,6 @@ public class SparkScanBuilder
     return postScanFilters.toArray(new Filter[0]);
   }
 
-  private boolean requiresSparkFiltering(Expression expr) {
-    return table.specs().values().stream()
-        .anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec, 
caseSensitive));
-  }
-
   @Override
   public Filter[] pushedFilters() {
     return pushedFilters;
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 0208121f00..725c7e1e70 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.StrictMetricsEvaluator;
@@ -50,6 +51,7 @@ import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
@@ -274,13 +276,17 @@ public class SparkTable
       }
     }
 
-    return deleteExpr == Expressions.alwaysTrue() || 
canDeleteUsingMetadata(deleteExpr);
+    return canDeleteUsingMetadata(deleteExpr);
   }
 
   // a metadata delete is possible iff matching files can be deleted entirely
   private boolean canDeleteUsingMetadata(Expression deleteExpr) {
-    boolean caseSensitive =
-        
Boolean.parseBoolean(sparkSession().conf().get("spark.sql.caseSensitive"));
+    boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
+
+    if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
+      return true;
+    }
+
     TableScan scan =
         table()
             .newScan()

Reply via email to