[spark] Pushdown IS NULL and IS NOT NULL predicates Change-Id: Ibdd9ecb54c37c7b13ece71ca3ad763f4228344c3 Reviewed-on: http://gerrit.cloudera.org:8080/5912 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d1fb71cf Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d1fb71cf Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d1fb71cf Branch: refs/heads/master Commit: d1fb71cf06f52fc735c8f7fa19dfb2df7ecde529 Parents: a9d716a Author: Dan Burkert <[email protected]> Authored: Mon Feb 6 10:09:24 2017 -0800 Committer: Will Berkeley <[email protected]> Committed: Mon Feb 6 18:43:52 2017 +0000 ---------------------------------------------------------------------- .../apache/kudu/spark/kudu/DefaultSource.scala | 26 +++++++++++++++++++- .../kudu/spark/kudu/DefaultSourceTest.scala | 15 +++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/d1fb71cf/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala index 198d09f..af1d573 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala @@ -176,6 +176,8 @@ with InsertableRelation { Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, prefix), comparisonPredicate(column, ComparisonOp.LESS, inf)) } + case IsNull(column) => Array(isNullPredicate(column)) + case IsNotNull(column) => Array(isNotNullPredicate(column)) case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right) case _ => Array() } @@ -233,6 +235,26 @@ with InsertableRelation { } /** + * Creates a new `IS NULL` predicate for the column. + * + * @param column the column name + * @return the `IS NULL` predicate + */ + private def isNullPredicate(column: String): KuduPredicate = { + KuduPredicate.newIsNullPredicate(table.getSchema.getColumn(column)) + } + + /** + * Creates a new `IS NULL` predicate for the column. + * + * @param column the column name + * @return the `IS NULL` predicate + */ + private def isNotNullPredicate(column: String): KuduPredicate = { + KuduPredicate.newIsNotNullPredicate(table.getSchema.getColumn(column)) + } + + /** * Writes data into an existing Kudu table. * * If the `kudu.operation` parameter is set, the data will use that operation @@ -281,7 +303,9 @@ private[spark] object KuduRelation { | LessThan(_, _) | LessThanOrEqual(_, _) | In(_, _) - | StringStartsWith(_, _) => true + | StringStartsWith(_, _) + | IsNull(_) + | IsNotNull(_) => true case And(left, right) => supportsFilter(left) && supportsFilter(right) case _ => false } http://git-wip-us.apache.org/repos/asf/kudu/blob/d1fb71cf/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 5ba1138..3dbd088 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -373,6 +373,21 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter { } } + test("Test SparkSQL IS NULL predicate") { + var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NULL").collectAsList() + assert(results.size() == 5) + + results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NULL").collectAsList() + assert(results.isEmpty()) + } + + test("Test SparkSQL IS NOT NULL predicate") { + var results = sqlContext.sql("SELECT key FROM " + tableName + " where c2_s IS NOT NULL").collectAsList() + assert(results.size() == 5) + + results = sqlContext.sql("SELECT key FROM " + tableName + " where key IS NOT NULL").collectAsList() + assert(results.size() == 10) + } test("Test SQL: insert into") { val insertTable = "insertintotest"
