This is an automated email from the ASF dual-hosted git repository. aokolnychyi pushed a commit to branch 1.4.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 10367c380098c2e06a49521a33681ac7f6c64b2c Author: Anton Okolnychyi <[email protected]> AuthorDate: Fri Sep 29 12:38:36 2023 -0700 Spark: Fix Decimal value conversion in V2 filters (#8682) --- .../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++ .../apache/iceberg/spark/sql/TestFilterPushDown.java | 19 +++++++++++++++++++ .../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++ .../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++ .../org/apache/iceberg/spark/SparkV2Filters.java | 3 +++ .../apache/iceberg/spark/sql/TestFilterPushDown.java | 20 ++++++++++++++++++++ 6 files changed, 68 insertions(+) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java index 6d564bbd62..43c7cecfa1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java @@ -49,6 +49,7 @@ import org.apache.spark.sql.connector.expressions.filter.And; import org.apache.spark.sql.connector.expressions.filter.Not; import org.apache.spark.sql.connector.expressions.filter.Or; import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; public class SparkV2Filters { @@ -285,6 +286,8 @@ public class SparkV2Filters { private static Object convertLiteral(Literal<?> literal) { if (literal.value() instanceof UTF8String) { return ((UTF8String) literal.value()).toString(); + } else if (literal.value() instanceof Decimal) { + return ((Decimal) literal.value()).toJavaBigDecimal(); } return literal.value(); } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java index 0ea34e187f..f1e2169af4 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.sql; +import java.math.BigDecimal; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -38,6 +39,24 @@ public class TestFilterPushDown extends SparkTestBaseWithCatalog { sql("DROP TABLE IF EXISTS tmp_view"); } + @Test + public void testFilterPushdownWithDecimalValues() { + sql( + "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep)", + tableName); + + sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName); + + checkFilters( + "dep = 'd1' AND salary > 100.03" /* query predicate */, + "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */, + "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */, + ImmutableList.of(row(2, new BigDecimal("100.05"), "d1"))); + } + @Test public void testFilterPushdownWithIdentityTransform() { sql( diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java index 06cb1335aa..57b9d61e38 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java @@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And; import org.apache.spark.sql.connector.expressions.filter.Not; import org.apache.spark.sql.connector.expressions.filter.Or; import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; public class SparkV2Filters { @@ -378,6 +379,8 @@ public class SparkV2Filters { private static Object convertLiteral(Literal<?> literal) { if (literal.value() instanceof UTF8String) { return ((UTF8String) literal.value()).toString(); + } else if (literal.value() instanceof Decimal) { + return ((Decimal) literal.value()).toJavaBigDecimal(); } return literal.value(); } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java index e7401a00e8..f2ef2d4705 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql; import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; +import java.math.BigDecimal; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -56,6 +57,25 @@ public class TestFilterPushDown extends SparkTestBaseWithCatalog { sql("DROP TABLE IF EXISTS tmp_view"); } + @Test + public void testFilterPushdownWithDecimalValues() { + sql( + "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep)", + tableName); + configurePlanningMode(planningMode); + + sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName); + + checkFilters( + "dep = 'd1' AND salary > 100.03" /* query predicate */, + "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */, + "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */, + ImmutableList.of(row(2, new BigDecimal("100.05"), "d1"))); + } + @Test public void testFilterPushdownWithIdentityTransform() { sql( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java index 06cb1335aa..57b9d61e38 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkV2Filters.java @@ -64,6 +64,7 @@ import org.apache.spark.sql.connector.expressions.filter.And; import org.apache.spark.sql.connector.expressions.filter.Not; import org.apache.spark.sql.connector.expressions.filter.Or; import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.types.UTF8String; public class SparkV2Filters { @@ -378,6 +379,8 @@ public class SparkV2Filters { private static Object convertLiteral(Literal<?> literal) { if (literal.value() instanceof UTF8String) { return ((UTF8String) literal.value()).toString(); + } else if (literal.value() instanceof Decimal) { + return ((Decimal) literal.value()).toJavaBigDecimal(); } return literal.value(); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java index e7401a00e8..f2ef2d4705 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.sql; import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; +import java.math.BigDecimal; import java.sql.Timestamp; import java.time.Instant; import java.util.List; @@ -56,6 +57,25 @@ public class TestFilterPushDown extends SparkTestBaseWithCatalog { sql("DROP TABLE IF EXISTS tmp_view"); } + @Test + public void testFilterPushdownWithDecimalValues() { + sql( + "CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep)", + tableName); + configurePlanningMode(planningMode); + + sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName); + sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName); + + checkFilters( + "dep = 'd1' AND salary > 100.03" /* query predicate */, + "isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */, + "dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */, + ImmutableList.of(row(2, new BigDecimal("100.05"), "d1"))); + } + @Test public void testFilterPushdownWithIdentityTransform() { sql(
