This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2817dd4af9 Spark 3.4: Push down system functions by V2 filters for 
rewriting DataFiles and PositionDeleteFiles (#8560)
2817dd4af9 is described below

commit 2817dd4af905b5c18eb6aaf84a36a08fa012dc55
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Sep 18 13:14:53 2023 -0700

    Spark 3.4: Push down system functions by V2 filters for rewriting DataFiles 
and PositionDeleteFiles (#8560)
---
 .../extensions/TestRewriteDataFilesProcedure.java  | 78 +++++++++++++++++++++-
 .../procedures/RewriteDataFilesProcedure.java      | 20 ++----
 .../datasources/SparkExpressionConverter.scala     |  9 +--
 .../spark/actions/TestRewriteDataFilesAction.java  | 28 +++++++-
 4 files changed, 114 insertions(+), 21 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 8013967181..2449c20ab9 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.ExtendedParser;
 import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkTableCache;
+import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -395,6 +396,38 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteDataFilesWithFilterOnOnBucketExpression() {
+    // currently spark session catalog only resolve to v1 functions instead of 
desired v2 functions
+    // 
https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
+    
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
+    createBucketPartitionTable();
+    // create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
+    insertData(10);
+    List<Object[]> expectedRecords = currentData();
+
+    // select only 5 files for compaction (files in the partition c2 = 'bar')
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_data_files(table => '%s',"
+                + " where => '%s.system.bucket(2, c2) = 0')",
+            catalogName, tableIdent, catalogName);
+
+    assertEquals(
+        "Action should rewrite 5 data files from single matching partition"
+            + "(containing bucket(c2) = 0) and add 1 data files",
+        row(5, 1),
+        row(output.get(0)[0], output.get(0)[1]));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(4);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
+  }
+
   @Test
   public void testRewriteDataFilesWithInFilterOnPartitionTable() {
     createPartitionTable();
@@ -480,7 +513,6 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     sql(
         "CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 
like \"%s\"')",
         catalogName, tableIdent, "car%");
-
     // TODO: Enable when org.apache.iceberg.spark.SparkFilters have 
implementations for
     // StringEndsWith & StringContains
     // StringEndsWith
@@ -491,6 +523,39 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     //     " where => 'c2 like \"%s\"')", catalogName, tableIdent, "%car%");
   }
 
+  @Test
+  public void testRewriteDataFilesWithPossibleV2Filters() {
+    // currently spark session catalog only resolve to v1 functions instead of 
desired v2 functions
+    // 
https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
+    
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
+
+    SystemFunctionPushDownHelper.createPartitionedTable(spark, tableName, 
"id");
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.bucket(2, data) >= 0')",
+        catalogName, tableIdent, catalogName);
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.truncate(4, id) >= 1')",
+        catalogName, tableIdent, catalogName);
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.years(ts) >= 1')",
+        catalogName, tableIdent, catalogName);
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.months(ts) >= 1')",
+        catalogName, tableIdent, catalogName);
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.days(ts) >= date(\"2023-01-01\")')",
+        catalogName, tableIdent, catalogName);
+    sql(
+        "CALL %s.system.rewrite_data_files(table => '%s',"
+            + " where => '%s.system.hours(ts) >= 1')",
+        catalogName, tableIdent, catalogName);
+  }
+
   @Test
   public void testRewriteDataFilesWithInvalidInputs() {
     createTable();
@@ -778,6 +843,17 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
         TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
   }
 
+  private void createBucketPartitionTable() {
+    sql(
+        "CREATE TABLE %s (c1 int, c2 string, c3 string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (bucket(2, c2)) "
+            + "TBLPROPERTIES ('%s' '%s')",
+        tableName,
+        TableProperties.WRITE_DISTRIBUTION_MODE,
+        TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
+  }
+
   private void insertData(int filesCount) {
     insertData(tableName, filesCount);
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 07e3f6232c..14246b47ac 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -24,20 +24,17 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.NamedReference;
 import org.apache.iceberg.expressions.Zorder;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.ExtendedParser;
-import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.Expression;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
-import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
@@ -103,8 +100,6 @@ class RewriteDataFilesProcedure extends BaseProcedure {
     return modifyIcebergTable(
         tableIdent,
         table -> {
-          String quotedFullIdentifier =
-              Spark3Util.quotedFullIdentifier(tableCatalog().name(), 
tableIdent);
           RewriteDataFiles action = actions().rewriteDataFiles(table);
 
           String strategy = args.isNullAt(1) ? null : args.getString(1);
@@ -120,7 +115,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
 
           String where = args.isNullAt(4) ? null : args.getString(4);
 
-          action = checkAndApplyFilter(action, where, quotedFullIdentifier);
+          action = checkAndApplyFilter(action, where, tableIdent);
 
           RewriteDataFiles.Result result = action.execute();
 
@@ -129,15 +124,10 @@ class RewriteDataFilesProcedure extends BaseProcedure {
   }
 
   private RewriteDataFiles checkAndApplyFilter(
-      RewriteDataFiles action, String where, String tableName) {
+      RewriteDataFiles action, String where, Identifier ident) {
     if (where != null) {
-      try {
-        Expression expression =
-            SparkExpressionConverter.collectResolvedSparkExpression(spark(), 
tableName, where);
-        return 
action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
-      } catch (AnalysisException e) {
-        throw new IllegalArgumentException("Cannot parse predicates in where 
option: " + where, e);
-      }
+      Expression expression = filterExpression(ident, where);
+      return action.filter(expression);
     }
     return action;
   }
diff --git 
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 9f53eae60a..4903a100f9 100644
--- 
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -19,7 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.iceberg.spark.SparkFilters
+import org.apache.iceberg.spark.SparkV2Filters
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -28,14 +28,15 @@ import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.Filter
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
 
 object SparkExpressionConverter {
 
   def convertToIcebergExpression(sparkExpression: Expression): 
org.apache.iceberg.expressions.Expression = {
-    // Currently, it is a double conversion as we are converting Spark 
expression to Spark filter
-    // and then converting Spark filter to Iceberg expression.
+    // Currently, it is a double conversion as we are converting Spark 
expression to Spark predicate
+    // and then converting Spark predicate to Iceberg expression.
     // But these two conversions already exist and well tested. So, we are 
going with this approach.
-    SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true).get)
+    
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
   }
 
   @throws[AnalysisException]
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 1581e58364..bfffa65acc 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -223,6 +223,32 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testBinPackWithFilterOnBucketExpression() {
+    Table table = createTablePartitioned(4, 2);
+
+    shouldHaveFiles(table, 8);
+    List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
+
+    Result result =
+        basicRewrite(table)
+            .filter(Expressions.equal("c1", 1))
+            .filter(Expressions.equal(Expressions.bucket("c2", 2), 0))
+            .execute();
+
+    assertThat(result)
+        .extracting(Result::rewrittenDataFilesCount, 
Result::addedDataFilesCount)
+        .as("Action should rewrite 2 data files into 1 data file")
+        .contains(2, 1);
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
+
+    shouldHaveFiles(table, 7);
+
+    List<Object[]> actualRecords = currentData();
+    assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   @Test
   public void testBinPackAfterPartitionChange() {
     Table table = createTable();
@@ -260,7 +286,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   }
 
   @Test
-  public void testBinPackWithDeletes() throws Exception {
+  public void testBinPackWithDeletes() {
     Table table = createTablePartitioned(4, 2);
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
     shouldHaveFiles(table, 8);

Reply via email to