PHOENIX-2328 "Unsupported filter" errors when using Spark DataFrame API
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a4c3c97 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a4c3c97 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a4c3c97 Branch: refs/heads/txn Commit: 2a4c3c974f9bcd70dadfd2a3913539d1417eb2e2 Parents: b68521a Author: Josh Mahonin <[email protected]> Authored: Sun Oct 18 11:43:38 2015 -0400 Committer: Josh Mahonin <[email protected]> Committed: Sun Oct 18 11:44:43 2015 -0400 ---------------------------------------------------------------------- .../apache/phoenix/spark/PhoenixSparkIT.scala | 45 ++++++++++++++++++++ .../apache/phoenix/spark/PhoenixRelation.scala | 10 ++++- 2 files changed, 54 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index f610d44..7f97cc7 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -466,6 +466,51 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll { df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress)) } + + test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") { + val sqlContext = new SQLContext(sc) + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", + "zkUrl" -> quorumAddress)) + + // Prefix match + val res1 = df.filter("COL1 like 'test_row_%'") + res1.count() shouldEqual 2 + + // Suffix match + val res2 = df.filter("COL1 like '%_1'") + res2.count() shouldEqual 1 + res2.first.getString(1) shouldEqual "test_row_1" + + // Infix match + val res3 = df.filter("COL1 like '%_row_%'") + res3.count() shouldEqual 2 + + // Not like, match none + val res4 = df.filter("COL1 not like '%_row_%'") + res4.count() shouldEqual 0 + + // Not like, match all + val res5 = df.filter("COL1 not like '%_wor_%'") + res5.count() shouldEqual 2 + + // "IN", match all + val res6 = df.filter("COL1 in ('test_row_1', 'test_row_2')") + res6.count() shouldEqual 2 + + // "IN", match none + val res7 = df.filter("COL1 in ('foo', 'bar')") + res7.count() shouldEqual 0 + + // AND (and not again) + val res8 = df.filter("COL1 like '%_row_%' AND COL1 not like '%_1'") + res8.count() shouldEqual 1 + res8.first.getString(1) shouldEqual "test_row_2" + + // OR + val res9 = df.filter("COL1 like '%_1' OR COL1 like '%_2'") + res9.count() shouldEqual 2 + } + // We can load the type, but it defaults to Spark's default (precision 38, scale 10) ignore("Can load decimal types with accurate precision and scale (PHOENIX-2288)") { http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala index eb347f6..3b660f9 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala @@ -68,11 +68,16 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC var i = 0 filters.foreach(f => { + // Assume conjunction for multiple filters, unless otherwise specified if (i > 0) { filter.append(" AND") } f match { + // Spark 1.3.1+ supported filters + case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter))) + case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter))) + case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter))) case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}") case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}") case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}") @@ -80,7 +85,10 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}") case IsNull(attr) => filter.append(s" $attr IS NULL") case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL") - case _ => throw new Exception("Unsupported filter") + case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}") + case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}") + case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}") + case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}") } i = i + 1
