Repository: ignite Updated Branches: refs/heads/ignite-2.5 c2a8bbc6b -> d7962f83f
http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala new file mode 100644 index 0000000..02793c9 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationMathFuncSpec.scala @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import java.lang.{Double â JDouble, Long â JLong} + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationMathFuncSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Supported optimized string functions") { + it("ABS") { + val df = igniteSession.sql("SELECT ABS(val) FROM numbers WHERE id = 6") + + checkOptimizationResult(df, "SELECT ABS(val) FROM numbers WHERE id is not null AND id = 6") + + val data = Tuple1(.5) + + checkQueryData(df, data) + } + + it("ACOS") { + val df = igniteSession.sql("SELECT ACOS(val) FROM numbers WHERE id = 7") + + checkOptimizationResult(df, "SELECT ACOS(val) FROM numbers WHERE id is not null AND id = 7") + + val data = Tuple1(Math.PI) + + checkQueryData(df, data) + } + + it("ASIN") { + val df = igniteSession.sql("SELECT ASIN(val) FROM numbers WHERE id = 7") + + checkOptimizationResult(df, "SELECT ASIN(val) FROM numbers WHERE id is not null AND id = 7") + + val data = Tuple1(-Math.PI/2) + + checkQueryData(df, data) + } + + it("ATAN") { + val df = igniteSession.sql("SELECT ATAN(val) FROM numbers WHERE id = 7") + + checkOptimizationResult(df, "SELECT ATAN(val) FROM numbers WHERE id is not null AND id = 7") + + val data = Tuple1(-Math.PI/4) + + checkQueryData(df, data) + } + + it("COS") { + val df = igniteSession.sql("SELECT COS(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT COS(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(1.0) + + checkQueryData(df, data) + } + + it("SIN") { + val df = igniteSession.sql("SELECT SIN(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT SIN(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(.0) + + checkQueryData(df, data) + } + + it("TAN") { + val df = igniteSession.sql("SELECT TAN(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT TAN(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(.0) + + checkQueryData(df, data) + } + + it("COSH") { + val df = igniteSession.sql("SELECT COSH(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT COSH(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(1.0) + + checkQueryData(df, data) + } + + it("SINH") { + val df = igniteSession.sql("SELECT SINH(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT SINH(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(.0) + + checkQueryData(df, data) + } + + it("TANH") { + val df = igniteSession.sql("SELECT TANH(val) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT TANH(val) FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(.0) + + checkQueryData(df, data) + } + + it("ATAN2") { + val df = igniteSession.sql("SELECT ATAN2(val, 0.0) FROM numbers WHERE id = 1") + + checkOptimizationResult(df, "SELECT ATAN2(val, 0.0) AS \"ATAN2(val, CAST(0.0 AS DOUBLE))\" " + + "FROM numbers WHERE id is not null AND id = 1") + + val data = Tuple1(.0) + + checkQueryData(df, data) + } + + it("MOD") { + val df = igniteSession.sql("SELECT val % 9 FROM numbers WHERE id = 8") + + checkOptimizationResult(df, "SELECT val % 9.0 as \"(val % CAST(9 AS DOUBLE))\" " + + "FROM numbers WHERE id is not null AND id = 8") + + val data = Tuple1(6.0) + + checkQueryData(df, data) + } + + it("CEIL") { + val df = igniteSession.sql("SELECT CEIL(val) FROM numbers WHERE id = 2") + + checkOptimizationResult(df, "SELECT CAST(CEIL(val) AS LONG) as \"CEIL(val)\" " + + "FROM numbers WHERE id is not null AND id = 2") + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it("ROUND") { + val df = igniteSession.sql("SELECT id, ROUND(val) FROM numbers WHERE id IN (2, 9, 10)") + + checkOptimizationResult(df, "SELECT id, ROUND(val, 0) FROM numbers WHERE id IN (2, 9, 10)") + + val data = ( + (2, 1.0), + (9, 1.0), + (10, 0.0)) + + checkQueryData(df, data) + } + + it("FLOOR") { + val df = igniteSession.sql("SELECT FLOOR(val) FROM numbers WHERE id = 2") + + checkOptimizationResult(df, "SELECT CAST(FLOOR(val) AS LONG) as \"FLOOR(val)\" FROM numbers " + + "WHERE id is not null AND id = 2") + + val data = Tuple1(0) + + checkQueryData(df, data) + } + + it("POWER") { + val df = igniteSession.sql("SELECT POWER(val, 3) FROM numbers WHERE id = 4") + + checkOptimizationResult(df, "SELECT POWER(val, 3.0) as \"POWER(val, CAST(3 AS DOUBLE))\" FROM numbers " + + "WHERE id is not null AND id = 4") + + val data = Tuple1(8.0) + + checkQueryData(df, data) + } + + it("EXP") { + val df = igniteSession.sql("SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)") + + checkOptimizationResult(df, "SELECT id, EXP(val) FROM numbers WHERE id IN (1, 3)") + + val data = ( + (1, 1), + (3, Math.E)) + + checkQueryData(df, data) + } + + it("LOG") { + val df = igniteSession.sql("SELECT LOG(val) FROM numbers WHERE id = 12") + + checkOptimizationResult(df, "SELECT LOG(val) as \"LOG(E(), val)\" FROM numbers " + + "WHERE id IS NOT NULL AND id = 12") + + val data = Tuple1(2.0) + + checkQueryData(df, data) + } + + it("LOG10") { + val df = igniteSession.sql("SELECT LOG10(val) FROM numbers WHERE id = 11") + + checkOptimizationResult(df, "SELECT LOG10(val) FROM numbers WHERE id IS NOT NULL AND id = 11") + + val data = Tuple1(2.0) + + checkQueryData(df, data) + } + + it("DEGREES") { + val df = igniteSession.sql("SELECT DEGREES(val) FROM numbers WHERE id = 13") + + checkOptimizationResult(df, "SELECT DEGREES(val) FROM numbers WHERE id IS NOT NULL AND id = 13") + + val data = Tuple1(180.0) + + checkQueryData(df, data) + } + + it("RADIANS") { + val df = igniteSession.sql("SELECT RADIANS(val) FROM numbers WHERE id = 14") + + checkOptimizationResult(df, "SELECT RADIANS(val) FROM numbers WHERE id IS NOT NULL AND id = 14") + + val data = Tuple1(Math.PI) + + checkQueryData(df, data) + } + + it("BITAND") { + val df = igniteSession.sql("SELECT int_val&1 FROM numbers WHERE id = 15") + + checkOptimizationResult(df, "SELECT BITAND(int_val, 1) as \"(int_val & CAST(1 AS BIGINT))\" FROM numbers " + + "WHERE id IS NOT NULL AND id = 15") + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it("BITOR") { + val df = igniteSession.sql("SELECT int_val|1 FROM numbers WHERE id = 16") + + checkOptimizationResult(df, "SELECT BITOR(int_val, 1) as \"(int_val | CAST(1 AS BIGINt))\" FROM numbers " + + "WHERE id IS NOT NULL AND id = 16") + + val data = Tuple1(3) + + checkQueryData(df, data) + } + + it("BITXOR") { + val df = igniteSession.sql("SELECT int_val^1 FROM numbers WHERE id = 17") + + checkOptimizationResult(df, "SELECT BITXOR(int_val, 1) AS \"(int_val ^ CAST(1 AS BIGINT))\" FROM numbers " + + "WHERE id IS NOT NULL AND id = 17") + + val data = Tuple1(2) + + checkQueryData(df, data) + } + + it("RAND") { + val df = igniteSession.sql("SELECT id, RAND(1) FROM numbers WHERE id = 17") + + checkOptimizationResult(df, "SELECT id, RAND(1) FROM numbers WHERE id IS NOT NULL AND id = 17") + + val data = df.rdd.collect + + assert(data(0).getAs[JLong]("id") == 17L) + assert(data(0).getAs[JDouble]("rand(1)") != null) + } + } + + def createNumberTable(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE numbers ( + | id LONG, + | val DOUBLE, + | int_val LONG, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + var qry = new SqlFieldsQuery("INSERT INTO numbers (id, val) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], .0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], .5.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], 1.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], 2.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(5L.asInstanceOf[JLong], 4.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(6L.asInstanceOf[JLong], -0.5.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(7L.asInstanceOf[JLong], -1.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(8L.asInstanceOf[JLong], 42.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(9L.asInstanceOf[JLong], .51.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(10L.asInstanceOf[JLong], .49.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(11L.asInstanceOf[JLong], 100.0.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(12L.asInstanceOf[JLong], (Math.E*Math.E).asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(13L.asInstanceOf[JLong], Math.PI.asInstanceOf[JDouble])).getAll + cache.query(qry.setArgs(14L.asInstanceOf[JLong], 180.0.asInstanceOf[JDouble])).getAll + + qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val) values (?, ?)") + + cache.query(qry.setArgs(15L.asInstanceOf[JLong], 1L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(16L.asInstanceOf[JLong], 2L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(17L.asInstanceOf[JLong], 3L.asInstanceOf[JLong])).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createNumberTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala new file mode 100644 index 0000000..00075f4 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSpec.scala @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expdfs or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Optimized queries") { + it("SELECT name as city_name FROM city") { + val df = igniteSession.sql("SELECT name as city_name FROM city") + + checkOptimizationResult(df, "SELECT name as city_name FROM city") + } + + it("SELECT count(*) as city_count FROM city") { + val df = igniteSession.sql("SELECT count(1) as city_count FROM city") + + checkOptimizationResult(df, "SELECT count(1) as city_count FROM city") + } + + it("SELECT count(*), city_id FROM person p GROUP BY city_id") { + val df = igniteSession.sql("SELECT city_id, count(*) FROM person GROUP BY city_id") + + checkOptimizationResult(df, "SELECT city_id, count(1) FROM person GROUP BY city_id") + + val data = ( + (1, 1), + (2, 3), + (3, 1) + ) + + checkQueryData(df, data) + } + + it("SELECT id, name FROM person WHERE id > 3 ORDER BY id") { + val df = igniteSession.sql("SELECT id, name FROM person WHERE id > 3 ORDER BY id") + + checkOptimizationResult(df, "SELECT id, name FROM person WHERE id IS NOT NULL AND id > 3 ORDER BY id") + + val data = ( + (4, "Richard Miles"), + (5, null)) + + checkQueryData(df, data) + } + + it("SELECT id, name FROM person WHERE id > 3 ORDER BY id DESC") { + val df = igniteSession.sql("SELECT id, name FROM person WHERE id > 3 ORDER BY id DESC") + + checkOptimizationResult(df, "SELECT id, name FROM person WHERE id IS NOT NULL AND id > 3 ORDER BY id DESC") + + val data = ( + (5, null), + (4, "Richard Miles")) + + checkQueryData(df, data, -_.getAs[Long]("id")) + } + + it("SELECT id, test_reverse(name) FROM city ORDER BY id") { + igniteSession.udf.register("test_reverse", (str: String) â str.reverse) + + val df = igniteSession.sql("SELECT id, test_reverse(name) FROM city ORDER BY id") + + checkOptimizationResult(df, "SELECT name, id FROM city") + + val data = ( + (1, "Forest Hill".reverse), + (2, "Denver".reverse), + (3, "St. Petersburg".reverse), + (4, "St. Petersburg".reverse)) + + checkQueryData(df, data) + } + + it("SELECT count(*), city_id FROM person p GROUP BY city_id HAVING count(*) > 1") { + val df = igniteSession.sql("SELECT city_id, count(*) FROM person p GROUP BY city_id HAVING count(*) > 1") + + checkOptimizationResult(df, "SELECT city_id, count(1) FROM person GROUP BY city_id HAVING count(1) > 1") + + val data = Tuple1( + (2, 3)) + + checkQueryData(df, data) + } + + it("SELECT id FROM city HAVING id > 1") { + val df = igniteSession.sql("SELECT id FROM city HAVING id > 1") + + checkOptimizationResult(df, "SELECT id FROM city WHERE id IS NOT NULL AND id > 1") + + val data = (2, 3, 4) + + checkQueryData(df, data) + } + + it("SELECT DISTINCT name FROM city ORDER BY name") { + val df = igniteSession.sql("SELECT DISTINCT name FROM city ORDER BY name") + + checkOptimizationResult(df, "SELECT name FROM city GROUP BY name ORDER BY name") + + val data = ("Denver", "Forest Hill", "St. Petersburg") + + checkQueryData(df, data) + } + + it("SELECT id, name FROM city ORDER BY id, name") { + val df = igniteSession.sql("SELECT id, name FROM city ORDER BY id, name") + + checkOptimizationResult(df, "SELECT id, name FROM city ORDER BY id, name") + + val data = ( + (1, "Forest Hill"), + (2, "Denver"), + (3, "St. Petersburg"), + (4, "St. Petersburg")) + + checkQueryData(df, data) + } + + it("SELECT id, name FROM city WHERE id > 1 ORDER BY id") { + val df = igniteSession.sql("SELECT id, name FROM city WHERE id > 1 ORDER BY id") + + checkOptimizationResult(df, "SELECT id, name FROM city WHERE id IS NOT NULL and id > 1 ORDER BY id") + + val data = ( + (2, "Denver"), + (3, "St. Petersburg"), + (4, "St. Petersburg")) + + checkQueryData(df, data) + } + + it("SELECT count(*) FROM city") { + val df = igniteSession.sql("SELECT count(*) FROM city") + + checkOptimizationResult(df, "SELECT count(1) FROM city") + + val data = Tuple1(4) + + checkQueryData(df, data) + } + + it("SELECT count(DISTINCT name) FROM city") { + val df = igniteSession.sql("SELECT count(DISTINCT name) FROM city") + + checkOptimizationResult(df, "SELECT count(DISTINCT name) FROM city") + + val data = Tuple1(3) + + checkQueryData(df, data) + } + + it("SELECT id FROM city LIMIT 2") { + val df = igniteSession.sql("SELECT id FROM city LIMIT 2") + + checkOptimizationResult(df, "SELECT id FROM city LIMIT 2") + + val data = (1, 2) + + checkQueryData(df, data) + } + + it("SELECT CAST(id AS STRING) FROM city") { + val df = igniteSession.sql("SELECT CAST(id AS STRING) FROM city") + + checkOptimizationResult(df, "SELECT CAST(id AS varchar) as id FROM city") + + val data = ("1", "2", "3", "4") + + checkQueryData(df, data) + } + + it("SELECT SQRT(id) FROM city WHERE id = 4 OR id = 1") { + val df = igniteSession.sql("SELECT SQRT(id) FROM city WHERE id = 4 OR id = 1") + + checkOptimizationResult(df, + "SELECT SQRT(cast(id as double)) FROM city WHERE id = 4 OR id = 1") + + val data = (1, 2) + + checkQueryData(df, data) + } + + it("SELECT CONCAT(id, \" - this is ID\") FROM city") { + val df = igniteSession.sql("SELECT CONCAT(id, \" - this is ID\") FROM city") + + checkOptimizationResult(df, + "SELECT CONCAT(cast(id AS VARCHAR), ' - this is ID') as \"CONCAT(cast(id AS STRING), - this is ID)\" " + + "FROM city") + + val data = ( + "1 - this is ID", + "2 - this is ID", + "3 - this is ID", + "4 - this is ID") + + checkQueryData(df, data) + } + + it("SELECT id FROM city WHERE CONCAT(id, \" - this is ID\") = \"1 - this is ID\"") { + val df = igniteSession.sql("SELECT id FROM city WHERE CONCAT(id, \" - this is ID\") = \"1 - this is ID\"") + + checkOptimizationResult(df, + "SELECT id FROM city WHERE CONCAT(CAST(id AS VARCHAR), ' - this is ID') = '1 - this is ID'") + + val data = Tuple1(1) + + checkQueryData(df, data) + } + } + + describe("Not Optimized Queries") { + it("SELECT id, name FROM json_cities") { + val citiesDataFrame = igniteSession.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + + citiesDataFrame.createOrReplaceTempView("JSON_CITIES") + + val df = igniteSession.sql("SELECT id, name FROM json_cities") + + val data = ( + (1, "Forest Hill"), + (2, "Denver"), + (3, "St. Petersburg")) + + checkQueryData(df, data) + } + + it("SELECT id, test_reverse(name) tr FROM city WHERE test_reverse(name) = 'revneD' ORDER BY id") { + val df = igniteSession.sql("SELECT id, test_reverse(name) tr " + + "FROM city WHERE test_reverse(name) = 'revneD' ORDER BY id") + + checkOptimizationResult(df) + } + + it("SELECT id, test_reverse(name) tr FROM city WHERE test_reverse(name) = 'revneD' and id > 0 ORDER BY id") { + val df = igniteSession.sql("SELECT id, test_reverse(name) tr " + + "FROM city WHERE test_reverse(name) = 'revneD' and id > 0 ORDER BY id") + + checkOptimizationResult(df) + } + + it("SELECT id, test_reverse(name) tr FROM city ORDER BY tr") { + val df = igniteSession.sql("SELECT id, test_reverse(name) tr FROM city ORDER BY tr") + + checkOptimizationResult(df) + } + + it("SELECT count(*), test_reverse(name) tr FROM city GROUP BY test_reverse(name)") { + val df = igniteSession.sql("SELECT count(*), test_reverse(name) tr FROM city GROUP BY test_reverse(name)") + + checkOptimizationResult(df) + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createPersonTable(client, DEFAULT_CACHE) + + createCityTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + + igniteSession.udf.register("test_reverse", (str: String) â str.reverse) + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala new file mode 100644 index 0000000..db106f2 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationStringFuncSpec.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import java.lang.{Long â JLong} + +/** + * === Doesn't supported by Spark === + * CHAR + * DIFFERENCE + * HEXTORAW + * RAWTOHEX + * REGEXP_LIKE + * SOUNDEX + * STRINGDECODE + * STRINGENCODE + * STRINGTOUTF8 + * UTF8TOSTRING + * XMLATTR + * XMLNODE + * XMLCOMMENT + * XMLCDATA + * XMLSTARTDOC + * XMLTEXT + * TO_CHAR - The function that can format a timestamp, a number, or text. + * ====== This functions in spark master but not in release ===== + * LEFT + * RIGHT + * INSERT + * REPLACE + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationStringFuncSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Supported optimized string functions") { + it("LENGTH") { + val df = igniteSession.sql("SELECT LENGTH(str) FROM strings WHERE id <= 3") + + checkOptimizationResult(df, "SELECT CAST(LENGTH(str) AS INTEGER) as \"length(str)\" FROM strings " + + "WHERE id is not null AND id <= 3") + + val data = (3, 3, 6) + + checkQueryData(df, data) + } + + it("RTRIM") { + val df = igniteSession.sql("SELECT RTRIM(str) FROM strings WHERE id = 3") + + checkOptimizationResult(df, "SELECT RTRIM(str) FROM strings WHERE id is not null AND id = 3") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + it("LTRIM") { + val df = igniteSession.sql("SELECT LTRIM(str) FROM strings WHERE id = 4") + + checkOptimizationResult(df, "SELECT LTRIM(str) FROM strings WHERE id is not null AND id = 4") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + it("TRIM") { + val df = igniteSession.sql("SELECT TRIM(str) FROM strings WHERE id = 5") + + checkOptimizationResult(df, "SELECT TRIM(str) FROM strings WHERE id is not null AND id = 5") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + it("LOWER") { + val df = igniteSession.sql("SELECT LOWER(str) FROM strings WHERE id = 2") + + checkOptimizationResult(df, "SELECT LOWER(str) FROM strings WHERE id is not null AND id = 2") + + val data = Tuple1("aaa") + + checkQueryData(df, data) + } + + it("UPPER") { + val df = igniteSession.sql("SELECT UPPER(str) FROM strings WHERE id = 1") + + checkOptimizationResult(df, "SELECT UPPER(str) FROM strings WHERE id is not null AND id = 1") + + val data = Tuple1("AAA") + + checkQueryData(df, data) + } + + it("LOWER(RTRIM)") { + val df = igniteSession.sql("SELECT LOWER(RTRIM(str)) FROM strings WHERE id = 3") + + checkOptimizationResult(df, "SELECT LOWER(RTRIM(str)) FROM strings WHERE id is not null AND id = 3") + + val data = Tuple1("aaa") + + checkQueryData(df, data) + } + + it("LOCATE") { + val df = igniteSession.sql("SELECT LOCATE('D', str) FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT LOCATE('D', str, 1) FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1(4) + + checkQueryData(df, data) + } + + it("LOCATE - 2") { + val df = igniteSession.sql("SELECT LOCATE('A', str) FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT LOCATE('A', str, 1) FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1(1) + + checkQueryData(df, data) + } + + it("POSITION") { + val df = igniteSession.sql("SELECT instr(str, 'BCD') FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT POSITION('BCD', str) as \"instr(str, BCD)\" FROM strings " + + "WHERE id is not null AND id = 6") + + val data = Tuple1(2) + + checkQueryData(df, data) + } + + it("CONCAT") { + val df = igniteSession.sql("SELECT concat(str, 'XXX') FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT concat(str, 'XXX') FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1("ABCDEFXXX") + + checkQueryData(df, data) + } + + it("RPAD") { + val df = igniteSession.sql("SELECT RPAD(str, 10, 'X') FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT RPAD(str, 10, 'X') FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1("ABCDEFXXXX") + + checkQueryData(df, data) + } + + it("LPAD") { + val df = igniteSession.sql("SELECT LPAD(str, 10, 'X') FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT LPAD(str, 10, 'X') FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1("XXXXABCDEF") + + checkQueryData(df, data) + } + + it("REPEAT") { + val df = igniteSession.sql("SELECT REPEAT(str, 2) FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT REPEAT(str, 2) FROM strings WHERE id is not null AND id = 6") + + val data = Tuple1("ABCDEFABCDEF") + + checkQueryData(df, data) + } + + it("SUBSTRING") { + val df = igniteSession.sql("SELECT SUBSTRING(str, 4, 3) FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT SUBSTR(str, 4, 3) as \"SUBSTRING(str, 4, 3)\" FROM strings " + + "WHERE id is not null AND id = 6") + + val data = Tuple1("DEF") + + checkQueryData(df, data) + } + + it("SPACE") { + val df = igniteSession.sql("SELECT SPACE(LENGTH(str)) FROM strings WHERE id = 1") + + checkOptimizationResult(df, "SELECT SPACE(CAST(LENGTH(str) AS INTEGER)) as \"SPACE(LENGTH(str))\" " + + "FROM strings WHERE id is not null AND id = 1") + + val data = Tuple1(" ") + + checkQueryData(df, data) + } + + it("ASCII") { + val df = igniteSession.sql("SELECT ASCII(str) FROM strings WHERE id = 7") + + checkOptimizationResult(df, "SELECT ASCII(str) FROM strings WHERE id is not null AND id = 7") + + val data = Tuple1(50) + + checkQueryData(df, data) + } + + it("REGEXP_REPLACE") { + val df = igniteSession.sql("SELECT REGEXP_REPLACE(str, '(\\\\d+)', 'num') FROM strings WHERE id = 7") + + checkOptimizationResult(df, "SELECT REGEXP_REPLACE(str, '(\\d+)', 'num') FROM strings " + + "WHERE id is not null AND id = 7") + + val data = Tuple1("num") + + checkQueryData(df, data) + } + + it("CONCAT_WS") { + val df = igniteSession.sql("SELECT id, CONCAT_WS(', ', str, 'after') FROM strings " + + "WHERE id >= 7 AND id <= 8") + + checkOptimizationResult(df, "SELECT id, CONCAT_WS(', ', str, 'after') FROM strings " + + "WHERE id is not null AND id >= 7 AND id <= 8") + + val data = ( + (7, "222, after"), + (8, "after")) + + checkQueryData(df, data) + } + + it("TRANSLATE") { + val df = igniteSession.sql("SELECT id, TRANSLATE(str, 'DEF', 'ABC') FROM strings WHERE id = 6") + + checkOptimizationResult(df, "SELECT id, TRANSLATE(str, 'DEF', 'ABC') FROM strings " + + "WHERE id is not null AND id = 6") + + val data = Tuple1((6, "ABCABC")) + + checkQueryData(df, data) + } + } + + def createStringTable(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE strings ( + | id LONG, + | str VARCHAR, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + val qry = new SqlFieldsQuery("INSERT INTO strings (id, str) values (?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "aaa")).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "AAA")).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "AAA ")).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], " AAA")).getAll + cache.query(qry.setArgs(5L.asInstanceOf[JLong], " AAA ")).getAll + cache.query(qry.setArgs(6L.asInstanceOf[JLong], "ABCDEF")).getAll + cache.query(qry.setArgs(7L.asInstanceOf[JLong], "222")).getAll + cache.query(qry.setArgs(8L.asInstanceOf[JLong], null)).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createStringTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3c3a24e8/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala new file mode 100644 index 0000000..282a45f --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteOptimizationSystemFuncSpec.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import java.lang.{Double â JDouble, Long â JLong} + +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, TEST_CONFIG_FILE, checkOptimizationResult, enclose} + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteOptimizationSystemFuncSpec extends AbstractDataFrameSpec { + var igniteSession: IgniteSparkSession = _ + + describe("Supported optimized system functions") { + it("COALESCE") { + val df = igniteSession.sql("SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)") + + checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)") + + val data = (1, 2, 3) + + checkQueryData(df, data) + } + + it("GREATEST") { + val df = igniteSession.sql("SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)") + + checkOptimizationResult(df, "SELECT GREATEST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)") + + val data = (4, 6) + + checkQueryData(df, data) + } + + it("LEAST") { + val df = igniteSession.sql("SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)") + + checkOptimizationResult(df, "SELECT LEAST(int_val1, int_val2) FROM numbers WHERE id IN (4, 5)") + + val data = (3, 5) + + checkQueryData(df, data) + } + + it("IFNULL") { + val df = igniteSession.sql("SELECT IFNULL(int_val1, int_val2) FROM numbers WHERE id IN (1, 2, 3)") + + checkOptimizationResult(df, "SELECT COALESCE(int_val1, int_val2) as \"ifnull(numbers.`int_val1`, numbers.`int_val2`)\" FROM numbers WHERE id IN (1, 2, 3)") + + val data = (1, 2, 3) + + checkQueryData(df, data) + } + + it("NULLIF") { + val df = igniteSession.sql("SELECT id, NULLIF(int_val1, int_val2) FROM numbers WHERE id IN (6, 7)") + + checkOptimizationResult(df) + + val data = ( + (6, null), + (7, 8)) + + checkQueryData(df, data) + } + + it("NVL2") { + val df = igniteSession.sql("SELECT id, NVL2(int_val1, 'not null', 'null') FROM numbers WHERE id IN (1, 2, 3)") + + checkOptimizationResult(df) + + val data = ( + (1, "not null"), + (2, "null"), + (3, "not null")) + + checkQueryData(df, data) + } + } + + def createNumberTable(client: Ignite, cacheName: String): Unit = { + val cache = client.cache(cacheName) + + cache.query(new SqlFieldsQuery( + """ + | CREATE TABLE numbers ( + | id LONG, + | int_val1 LONG, + | int_val2 LONG, + | PRIMARY KEY (id)) WITH "backups=1" + """.stripMargin)).getAll + + + val qry = new SqlFieldsQuery("INSERT INTO numbers (id, int_val1, int_val2) values (?, ?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], 1L.asInstanceOf[JLong], null)).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], null, 2L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], null)).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], 3L.asInstanceOf[JLong], 4L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(5L.asInstanceOf[JLong], 6L.asInstanceOf[JLong], 5L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(6L.asInstanceOf[JLong], 7L.asInstanceOf[JLong], 7L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(7L.asInstanceOf[JLong], 8L.asInstanceOf[JLong], 9L.asInstanceOf[JLong])).getAll + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createNumberTable(client, DEFAULT_CACHE) + + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + igniteSession = IgniteSparkSession.builder() + .config(spark.sparkContext.getConf) + .igniteConfigProvider(configProvider) + .getOrCreate() + } +}
