This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b972296f Spark 3.3, 3.4: Add where filter to 
rewrite_position_delete_files procedure (#8289)
24b972296f is described below

commit 24b972296f89376f62011102e02536e96bcfb852
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Aug 16 17:26:09 2023 -0700

    Spark 3.3, 3.4: Add where filter to rewrite_position_delete_files procedure 
(#8289)
---
 .../TestRewritePositionDeleteFilesProcedure.java   | 70 +++++++++++++++++++---
 .../iceberg/spark/procedures/BaseProcedure.java    | 14 +++++
 .../RewritePositionDeleteFilesProcedure.java       | 17 +++++-
 .../TestRewritePositionDeleteFilesProcedure.java   | 70 +++++++++++++++++++---
 .../iceberg/spark/procedures/BaseProcedure.java    | 14 +++++
 .../RewritePositionDeleteFilesProcedure.java       | 17 +++++-
 6 files changed, 184 insertions(+), 18 deletions(-)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 88715fd89c..0bc2bb9961 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -41,19 +41,36 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
   }
 
   private void createTable() throws Exception {
+    createTable(false);
+  }
+
+  private void createTable(boolean partitioned) throws Exception {
+    String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
     sql(
-        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+        "CREATE TABLE %s (id bigint, data string) USING iceberg %s 
TBLPROPERTIES"
             + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+        tableName, partitionStmt);
 
     List<SimpleRecord> records =
         Lists.newArrayList(
             new SimpleRecord(1, "a"),
-            new SimpleRecord(2, "b"),
-            new SimpleRecord(3, "c"),
-            new SimpleRecord(4, "d"),
-            new SimpleRecord(5, "e"),
-            new SimpleRecord(6, "f"));
+            new SimpleRecord(1, "b"),
+            new SimpleRecord(1, "c"),
+            new SimpleRecord(2, "d"),
+            new SimpleRecord(2, "e"),
+            new SimpleRecord(2, "f"),
+            new SimpleRecord(3, "g"),
+            new SimpleRecord(3, "h"),
+            new SimpleRecord(3, "i"),
+            new SimpleRecord(4, "j"),
+            new SimpleRecord(4, "k"),
+            new SimpleRecord(4, "l"),
+            new SimpleRecord(5, "m"),
+            new SimpleRecord(5, "n"),
+            new SimpleRecord(5, "o"),
+            new SimpleRecord(6, "p"),
+            new SimpleRecord(6, "q"),
+            new SimpleRecord(6, "r"));
     spark
         .createDataset(records, Encoders.bean(SimpleRecord.class))
         .coalesce(1)
@@ -130,6 +147,45 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
         output);
   }
 
+  @Test
+  public void testExpireDeleteFilesFilter() throws Exception {
+    createTable(true);
+
+    sql("DELETE FROM %s WHERE id = 1 and data='a'", tableName);
+    sql("DELETE FROM %s WHERE id = 1 and data='b'", tableName);
+    sql("DELETE FROM %s WHERE id = 2 and data='d'", tableName);
+    sql("DELETE FROM %s WHERE id = 2 and data='e'", tableName);
+    sql("DELETE FROM %s WHERE id = 3 and data='g'", tableName);
+    sql("DELETE FROM %s WHERE id = 3 and data='h'", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(6, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                // data filter is ignored as it cannot be applied to position 
deletes
+                + "where => 'id IN (1, 2) AND data=\"bar\"',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 4 delete files and add 2",
+        ImmutableList.of(
+            row(
+                4,
+                2,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(4, TestHelpers.deleteFiles(table).size());
+  }
+
   @Test
   public void testInvalidOption() throws Exception {
     createTable();
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index ed0156adc5..d7c01936ee 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Function;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,7 @@ import 
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -44,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.execution.CacheManager;
+import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
@@ -160,6 +163,17 @@ abstract class BaseProcedure implements Procedure {
     cacheManager.recacheByPlan(spark, relation);
   }
 
+  protected Expression filterExpression(Identifier ident, String where) {
+    try {
+      String name = Spark3Util.quotedFullIdentifier(tableCatalog.name(), 
ident);
+      org.apache.spark.sql.catalyst.expressions.Expression expression =
+          SparkExpressionConverter.collectResolvedSparkExpression(spark, name, 
where);
+      return SparkExpressionConverter.convertToIcebergExpression(expression);
+    } catch (AnalysisException e) {
+      throw new IllegalArgumentException("Cannot parse predicates in where 
option: " + where, e);
+    }
+  }
+
   protected InternalRow newInternalRow(Object... values) {
     return new GenericInternalRow(values);
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
index a4a3f63ba7..3d5e45ce8b 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark.procedures;
 
 import java.util.Map;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
 import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -42,9 +44,11 @@ public class RewritePositionDeleteFilesProcedure extends 
BaseProcedure {
       ProcedureParameter.required("table", DataTypes.StringType);
   private static final ProcedureParameter OPTIONS_PARAM =
       ProcedureParameter.optional("options", STRING_MAP);
+  private static final ProcedureParameter WHERE_PARAM =
+      ProcedureParameter.optional("where", DataTypes.StringType);
 
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM, WHERE_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -85,11 +89,20 @@ public class RewritePositionDeleteFilesProcedure extends 
BaseProcedure {
     ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
     Identifier tableIdent = input.ident(TABLE_PARAM);
     Map<String, String> options = input.asStringMap(OPTIONS_PARAM, 
ImmutableMap.of());
+    String where = input.asString(WHERE_PARAM, null);
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Result result = 
actions().rewritePositionDeletes(table).options(options).execute();
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+
+          if (where != null) {
+            Expression whereExpression = filterExpression(tableIdent, where);
+            action = action.filter(whereExpression);
+          }
+
+          Result result = action.execute();
           return new InternalRow[] {toOutputRow(result)};
         });
   }
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 88715fd89c..0bc2bb9961 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -41,19 +41,36 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
   }
 
   private void createTable() throws Exception {
+    createTable(false);
+  }
+
+  private void createTable(boolean partitioned) throws Exception {
+    String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
     sql(
-        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+        "CREATE TABLE %s (id bigint, data string) USING iceberg %s 
TBLPROPERTIES"
             + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
-        tableName);
+        tableName, partitionStmt);
 
     List<SimpleRecord> records =
         Lists.newArrayList(
             new SimpleRecord(1, "a"),
-            new SimpleRecord(2, "b"),
-            new SimpleRecord(3, "c"),
-            new SimpleRecord(4, "d"),
-            new SimpleRecord(5, "e"),
-            new SimpleRecord(6, "f"));
+            new SimpleRecord(1, "b"),
+            new SimpleRecord(1, "c"),
+            new SimpleRecord(2, "d"),
+            new SimpleRecord(2, "e"),
+            new SimpleRecord(2, "f"),
+            new SimpleRecord(3, "g"),
+            new SimpleRecord(3, "h"),
+            new SimpleRecord(3, "i"),
+            new SimpleRecord(4, "j"),
+            new SimpleRecord(4, "k"),
+            new SimpleRecord(4, "l"),
+            new SimpleRecord(5, "m"),
+            new SimpleRecord(5, "n"),
+            new SimpleRecord(5, "o"),
+            new SimpleRecord(6, "p"),
+            new SimpleRecord(6, "q"),
+            new SimpleRecord(6, "r"));
     spark
         .createDataset(records, Encoders.bean(SimpleRecord.class))
         .coalesce(1)
@@ -130,6 +147,45 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
         output);
   }
 
+  @Test
+  public void testExpireDeleteFilesFilter() throws Exception {
+    createTable(true);
+
+    sql("DELETE FROM %s WHERE id = 1 and data='a'", tableName);
+    sql("DELETE FROM %s WHERE id = 1 and data='b'", tableName);
+    sql("DELETE FROM %s WHERE id = 2 and data='d'", tableName);
+    sql("DELETE FROM %s WHERE id = 2 and data='e'", tableName);
+    sql("DELETE FROM %s WHERE id = 3 and data='g'", tableName);
+    sql("DELETE FROM %s WHERE id = 3 and data='h'", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(6, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                // data filter is ignored as it cannot be applied to position 
deletes
+                + "where => 'id IN (1, 2) AND data=\"bar\"',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 4 delete files and add 2",
+        ImmutableList.of(
+            row(
+                4,
+                2,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(4, TestHelpers.deleteFiles(table).size());
+  }
+
   @Test
   public void testInvalidOption() throws Exception {
     createTable();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index ed0156adc5..d7c01936ee 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Function;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,7 @@ import 
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -44,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.execution.CacheManager;
+import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
@@ -160,6 +163,17 @@ abstract class BaseProcedure implements Procedure {
     cacheManager.recacheByPlan(spark, relation);
   }
 
+  protected Expression filterExpression(Identifier ident, String where) {
+    try {
+      String name = Spark3Util.quotedFullIdentifier(tableCatalog.name(), 
ident);
+      org.apache.spark.sql.catalyst.expressions.Expression expression =
+          SparkExpressionConverter.collectResolvedSparkExpression(spark, name, 
where);
+      return SparkExpressionConverter.convertToIcebergExpression(expression);
+    } catch (AnalysisException e) {
+      throw new IllegalArgumentException("Cannot parse predicates in where 
option: " + where, e);
+    }
+  }
+
   protected InternalRow newInternalRow(Object... values) {
     return new GenericInternalRow(values);
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
index a4a3f63ba7..3d5e45ce8b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark.procedures;
 
 import java.util.Map;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
 import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -42,9 +44,11 @@ public class RewritePositionDeleteFilesProcedure extends 
BaseProcedure {
       ProcedureParameter.required("table", DataTypes.StringType);
   private static final ProcedureParameter OPTIONS_PARAM =
       ProcedureParameter.optional("options", STRING_MAP);
+  private static final ProcedureParameter WHERE_PARAM =
+      ProcedureParameter.optional("where", DataTypes.StringType);
 
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM, WHERE_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -85,11 +89,20 @@ public class RewritePositionDeleteFilesProcedure extends 
BaseProcedure {
     ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
     Identifier tableIdent = input.ident(TABLE_PARAM);
     Map<String, String> options = input.asStringMap(OPTIONS_PARAM, 
ImmutableMap.of());
+    String where = input.asString(WHERE_PARAM, null);
 
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          Result result = 
actions().rewritePositionDeletes(table).options(options).execute();
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+
+          if (where != null) {
+            Expression whereExpression = filterExpression(tableIdent, where);
+            action = action.filter(whereExpression);
+          }
+
+          Result result = action.execute();
           return new InternalRow[] {toOutputRow(result)};
         });
   }

Reply via email to