This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3f2884c6d2 Spark: Fix Decimal value conversion in V2 filters (#8682)
3f2884c6d2 is described below
commit 3f2884c6d2e514f1112bd440fedfbb7d30227a96
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Sep 29 12:38:36 2023 -0700
Spark: Fix Decimal value conversion in V2 filters (#8682)
---
.../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++
.../apache/iceberg/spark/sql/TestFilterPushDown.java | 19 +++++++++++++++++++
.../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++
.../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++
.../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++
.../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++
6 files changed, 68 insertions(+)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 6d564bbd62..43c7cecfa1 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
public class SparkV2Filters {
@@ -285,6 +286,8 @@ public class SparkV2Filters {
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
+ } else if (literal.value() instanceof Decimal) {
+ return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 0ea34e187f..f1e2169af4 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.sql;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
@@ -38,6 +39,24 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
sql("DROP TABLE IF EXISTS tmp_view");
}
+ @Test
+ public void testFilterPushdownWithDecimalValues() {
+ sql(
+ "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+
+ sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+ checkFilters(
+ "dep = 'd1' AND salary > 100.03" /* query predicate */,
+ "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+ "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /*
Iceberg scan filters */,
+ ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+ }
+
@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 06cb1335aa..57b9d61e38 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
public class SparkV2Filters {
@@ -378,6 +379,8 @@ public class SparkV2Filters {
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
+ } else if (literal.value() instanceof Decimal) {
+ return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index e7401a00e8..f2ef2d4705 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql;
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
@@ -56,6 +57,25 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
sql("DROP TABLE IF EXISTS tmp_view");
}
+ @Test
+ public void testFilterPushdownWithDecimalValues() {
+ sql(
+ "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+ checkFilters(
+ "dep = 'd1' AND salary > 100.03" /* query predicate */,
+ "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+ "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /*
Iceberg scan filters */,
+ ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+ }
+
@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
index 06cb1335aa..57b9d61e38 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java
@@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And;
import org.apache.spark.sql.connector.expressions.filter.Not;
import org.apache.spark.sql.connector.expressions.filter.Or;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
public class SparkV2Filters {
@@ -378,6 +379,8 @@ public class SparkV2Filters {
private static Object convertLiteral(Literal<?> literal) {
if (literal.value() instanceof UTF8String) {
return ((UTF8String) literal.value()).toString();
+ } else if (literal.value() instanceof Decimal) {
+ return ((Decimal) literal.value()).toJavaBigDecimal();
}
return literal.value();
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index e7401a00e8..f2ef2d4705 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql;
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
@@ -56,6 +57,25 @@ public class TestFilterPushDown extends
SparkTestBaseWithCatalog {
sql("DROP TABLE IF EXISTS tmp_view");
}
+ @Test
+ public void testFilterPushdownWithDecimalValues() {
+ sql(
+ "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ + "USING iceberg "
+ + "PARTITIONED BY (dep)",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
+ sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
+
+ checkFilters(
+ "dep = 'd1' AND salary > 100.03" /* query predicate */,
+ "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
+ "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /*
Iceberg scan filters */,
+ ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
+ }
+
@Test
public void testFilterPushdownWithIdentityTransform() {
sql(