This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ca1e0ffeb3 Spark 3.1, 3.2, 3.3, 3.5: Throw better exception when
filter expression cannot be translated in Rewrite procedure (#8605)
ca1e0ffeb3 is described below
commit ca1e0ffeb3b6399cc4c2f890bfad032b0aecef4e
Author: Xianyang Liu <[email protected]>
AuthorDate: Thu Sep 21 13:29:44 2023 +0800
Spark 3.1, 3.2, 3.3, 3.5: Throw better exception when filter expression
cannot be translated in Rewrite procedure (#8605)
---
.../extensions/TestRewriteDataFilesProcedure.java | 21 +++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 12 +++++++++++-
.../extensions/TestRewriteDataFilesProcedure.java | 21 +++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 12 +++++++++++-
.../extensions/TestRewriteDataFilesProcedure.java | 21 +++++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 21 +++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 12 +++++++++++-
.../extensions/TestRewriteDataFilesProcedure.java | 20 ++++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 21 +++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 12 +++++++++++-
10 files changed, 169 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d8c415b281..f443b6f8f1 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -34,6 +34,7 @@ import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
@@ -412,6 +413,26 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
() -> sql("CALL %s.system.rewrite_data_files('')", catalogName));
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'lower(c2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'c2 like \"%%fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
diff --git
a/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 60de1cb82a..077884b1a6 100644
---
a/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.1/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -31,7 +31,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark
expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
- SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true).get)
+ DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true) match {
+ case Some(filter) =>
+ val converted = SparkFilters.convert(filter)
+ if (converted == null) {
+ throw new IllegalArgumentException(s"Cannot convert Spark filter:
$filter to Iceberg expression")
+ }
+
+ converted
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot translate Spark
expression: $sparkExpression to data source filter")
+ }
}
@throws[AnalysisException]
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d13e0967b6..ba838a4a98 100644
---
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -656,6 +657,26 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'lower(c2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'c2 like \"%%fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
diff --git
a/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index c4b5a7c0ce..4ec46ee140 100644
---
a/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.2/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -32,7 +32,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark
expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
- SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true).get)
+ DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true) match {
+ case Some(filter) =>
+ val converted = SparkFilters.convert(filter)
+ if (converted == null) {
+ throw new IllegalArgumentException(s"Cannot convert Spark filter:
$filter to Iceberg expression")
+ }
+
+ converted
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot translate Spark
expression: $sparkExpression to data source filter")
+ }
}
@throws[AnalysisException]
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 78fe78742b..0cdde158bd 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -699,6 +700,26 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'lower(c2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'c2 like \"%%fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0bc2bb9961..481e2f01f2 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -29,6 +29,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -202,6 +203,26 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
catalogName, tableIdent));
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() throws
Exception {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'lower(data) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'data like \"%%fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 9f53eae60a..7f6641e1b2 100644
---
a/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -35,7 +35,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark
expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
- SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true).get)
+ DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true) match {
+ case Some(filter) =>
+ val converted = SparkFilters.convert(filter)
+ if (converted == null) {
+ throw new IllegalArgumentException(s"Cannot convert Spark filter:
$filter to Iceberg expression")
+ }
+
+ converted
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot translate Spark
expression: $sparkExpression to data source filter")
+ }
}
@throws[AnalysisException]
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 2449c20ab9..25e506a85a 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -828,6 +828,26 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'substr(encode(c2, \"utf-8\"), 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'substr(c2, 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0bc2bb9961..5dde5d698e 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -29,6 +29,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -202,6 +203,26 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
catalogName, tableIdent));
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() throws
Exception {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'substr(encode(data, \"utf-8\"), 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'substr(data, 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 4903a100f9..d6f45657be 100644
---
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -36,7 +36,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark
expression to Spark predicate
// and then converting Spark predicate to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
-
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
+ DataSourceV2Strategy.translateFilterV2(sparkExpression) match {
+ case Some(filter) =>
+ val converted = SparkV2Filters.convert(filter)
+ if (converted == null) {
+ throw new IllegalArgumentException(s"Cannot convert Spark filter:
$filter to Iceberg expression")
+ }
+
+ converted
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot translate Spark
expression: $sparkExpression to data source filter")
+ }
}
@throws[AnalysisException]