This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 09a5dbc816 Spark 3.4: Throw better exception when filter expression
cannot be translated in Rewrite procedure (#8394)
09a5dbc816 is described below
commit 09a5dbc8166489d5e58eeb17cc96217bc75c55b9
Author: Xianyang Liu <[email protected]>
AuthorDate: Wed Sep 20 14:35:03 2023 +0800
Spark 3.4: Throw better exception when filter expression cannot be
translated in Rewrite procedure (#8394)
---
.../extensions/TestRewriteDataFilesProcedure.java | 20 ++++++++++++++++++++
.../TestRewritePositionDeleteFilesProcedure.java | 21 +++++++++++++++++++++
.../datasources/SparkExpressionConverter.scala | 12 +++++++++++-
3 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 2449c20ab9..25e506a85a 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -828,6 +828,26 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'substr(encode(c2, \"utf-8\"), 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where =>
'substr(c2, 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
tableName);
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 0bc2bb9961..5dde5d698e 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -29,6 +29,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -202,6 +203,26 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
catalogName, tableIdent));
}
+ @Test
+ public void testRewriteWithUntranslatedOrUnconvertedFilter() throws
Exception {
+ createTable();
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'substr(encode(data, \"utf-8\"), 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot translate Spark expression");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.rewrite_position_delete_files(table =>
'%s', where => 'substr(data, 2) = \"fo\"')",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot convert Spark filter");
+ }
+
private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
diff --git
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 4903a100f9..d6f45657be 100644
---
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -36,7 +36,17 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark
expression to Spark predicate
// and then converting Spark predicate to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
-
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
+ DataSourceV2Strategy.translateFilterV2(sparkExpression) match {
+ case Some(filter) =>
+ val converted = SparkV2Filters.convert(filter)
+ if (converted == null) {
+ throw new IllegalArgumentException(s"Cannot convert Spark filter:
$filter to Iceberg expression")
+ }
+
+ converted
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot translate Spark
expression: $sparkExpression to data source filter")
+ }
}
@throws[AnalysisException]