This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ca1e0ffeb3 Spark 3.1, 3.2, 3.3, 3.5: Throw better exception when 
filter expression cannot be translated in Rewrite procedure (#8605)
ca1e0ffeb3 is described below

commit ca1e0ffeb3b6399cc4c2f890bfad032b0aecef4e
Author: Xianyang Liu <[email protected]>
AuthorDate: Thu Sep 21 13:29:44 2023 +0800

    Spark 3.1, 3.2, 3.3, 3.5: Throw better exception when filter expression 
cannot be translated in Rewrite procedure (#8605)
---
 .../extensions/TestRewriteDataFilesProcedure.java   | 21 +++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala      | 12 +++++++++++-
 .../extensions/TestRewriteDataFilesProcedure.java   | 21 +++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala      | 12 +++++++++++-
 .../extensions/TestRewriteDataFilesProcedure.java   | 21 +++++++++++++++++++++
 .../TestRewritePositionDeleteFilesProcedure.java    | 21 +++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala      | 12 +++++++++++-
 .../extensions/TestRewriteDataFilesProcedure.java   | 20 ++++++++++++++++++++
 .../TestRewritePositionDeleteFilesProcedure.java    | 21 +++++++++++++++++++++
 .../datasources/SparkExpressionConverter.scala      | 12 +++++++++++-
 10 files changed, 169 insertions(+), 4 deletions(-)

diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d8c415b281..f443b6f8f1 100644
--- 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Test;
 
@@ -412,6 +413,26 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
         () -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'lower(c2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'c2 like \"%%fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
diff --git 
a/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 60de1cb82a..077884b1a6 100644
--- 
a/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -31,7 +31,17 @@ object SparkExpressionConverter {
     // Currently, it is a double conversion as we are converting Spark 
expression to Spark filter
     // and then converting Spark filter to Iceberg expression.
     // But these two conversions already exist and well tested. So, we are 
going with this approach.
-    SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true).get)
+    DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true) match {
+      case Some(filter) =>
+        val converted = SparkFilters.convert(filter)
+        if (converted == null) {
+          throw new IllegalArgumentException(s"Cannot convert Spark filter: 
$filter to Iceberg expression")
+        }
+
+        converted
+      case _ =>
+        throw new IllegalArgumentException(s"Cannot translate Spark 
expression: $sparkExpression to data source filter")
+    }
   }
 
   @throws[AnalysisException]
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d13e0967b6..ba838a4a98 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -656,6 +657,26 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'lower(c2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'c2 like \"%%fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
diff --git 
a/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index c4b5a7c0ce..4ec46ee140 100644
--- 
a/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -32,7 +32,17 @@ object SparkExpressionConverter {
     // Currently, it is a double conversion as we are converting Spark 
expression to Spark filter
     // and then converting Spark filter to Iceberg expression.
     // But these two conversions already exist and well tested. So, we are 
going with this approach.
-    SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true).get)
+    DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true) match {
+      case Some(filter) =>
+        val converted = SparkFilters.convert(filter)
+        if (converted == null) {
+          throw new IllegalArgumentException(s"Cannot convert Spark filter: 
$filter to Iceberg expression")
+        }
+
+        converted
+      case _ =>
+        throw new IllegalArgumentException(s"Cannot translate Spark 
expression: $sparkExpression to data source filter")
+    }
   }
 
   @throws[AnalysisException]
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 78fe78742b..0cdde158bd 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -699,6 +700,26 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'lower(c2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'c2 like \"%%fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0bc2bb9961..481e2f01f2 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -29,6 +29,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -202,6 +203,26 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
                 catalogName, tableIdent));
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() throws 
Exception {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_position_delete_files(table => 
'%s', where => 'lower(data) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_position_delete_files(table => 
'%s', where => 'data like \"%%fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private Map<String, String> snapshotSummary() {
     return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
   }
diff --git 
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 9f53eae60a..7f6641e1b2 100644
--- 
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -35,7 +35,17 @@ object SparkExpressionConverter {
     // Currently, it is a double conversion as we are converting Spark 
expression to Spark filter
     // and then converting Spark filter to Iceberg expression.
     // But these two conversions already exist and well tested. So, we are 
going with this approach.
-    SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true).get)
+    DataSourceStrategy.translateFilter(sparkExpression, 
supportNestedPredicatePushdown = true) match {
+      case Some(filter) =>
+        val converted = SparkFilters.convert(filter)
+        if (converted == null) {
+          throw new IllegalArgumentException(s"Cannot convert Spark filter: 
$filter to Iceberg expression")
+        }
+
+        converted
+      case _ =>
+        throw new IllegalArgumentException(s"Cannot translate Spark 
expression: $sparkExpression to data source filter")
+    }
   }
 
   @throws[AnalysisException]
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 2449c20ab9..25e506a85a 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -828,6 +828,26 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'substr(encode(c2, \"utf-8\"), 2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_data_files(table => '%s', where => 
'substr(c2, 2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private void createTable() {
     sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", 
tableName);
   }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0bc2bb9961..5dde5d698e 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -29,6 +29,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -202,6 +203,26 @@ public class TestRewritePositionDeleteFilesProcedure 
extends SparkExtensionsTest
                 catalogName, tableIdent));
   }
 
+  @Test
+  public void testRewriteWithUntranslatedOrUnconvertedFilter() throws 
Exception {
+    createTable();
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_position_delete_files(table => 
'%s', where => 'substr(encode(data, \"utf-8\"), 2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot translate Spark expression");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.rewrite_position_delete_files(table => 
'%s', where => 'substr(data, 2) = \"fo\"')",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Cannot convert Spark filter");
+  }
+
   private Map<String, String> snapshotSummary() {
     return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
   }
diff --git 
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
 
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 4903a100f9..d6f45657be 100644
--- 
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++ 
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -36,7 +36,17 @@ object SparkExpressionConverter {
     // Currently, it is a double conversion as we are converting Spark 
expression to Spark predicate
     // and then converting Spark predicate to Iceberg expression.
     // But these two conversions already exist and well tested. So, we are 
going with this approach.
-    
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
+    DataSourceV2Strategy.translateFilterV2(sparkExpression) match {
+      case Some(filter) =>
+        val converted = SparkV2Filters.convert(filter)
+        if (converted == null) {
+          throw new IllegalArgumentException(s"Cannot convert Spark filter: 
$filter to Iceberg expression")
+        }
+
+        converted
+      case _ =>
+        throw new IllegalArgumentException(s"Cannot translate Spark 
expression: $sparkExpression to data source filter")
+    }
   }
 
   @throws[AnalysisException]

Reply via email to