refine feature selection in spark integration
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/1347de98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/1347de98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/1347de98 Branch: refs/heads/JIRA-22/pr-385 Commit: 1347de985ea6f8028c9d381f8827882ad39ad3a7 Parents: aa7d529 Author: amaya <[email protected]> Authored: Wed Sep 28 14:22:05 2016 +0900 Committer: amaya <[email protected]> Committed: Wed Sep 28 14:22:05 2016 +0900 ---------------------------------------------------------------------- .../org/apache/spark/sql/hive/HivemallOps.scala | 9 +- .../spark/sql/hive/HivemallOpsSuite.scala | 94 ++++++++++++++------ .../org/apache/spark/sql/hive/HivemallOps.scala | 8 +- .../spark/sql/hive/HivemallOpsSuite.scala | 89 ++++++++++++------ 4 files changed, 138 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1347de98/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index 41a4065..255f697 100644 --- a/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-1.6/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -1006,9 +1006,9 @@ object HivemallOps { * @see hivemall.ftvec.selection.ChiSquareUDF * @group ftvec.selection */ - def chi2(exprs: Column*): Column = { + def chi2(observed: Column, expected: Column): Column = { HiveGenericUDF(new HiveFunctionWrapper( - "hivemall.ftvec.selection.ChiSquareUDF"), exprs.map(_.expr)) + "hivemall.ftvec.selection.ChiSquareUDF"), Seq(observed.expr, expected.expr)) } /** @@ -1087,10 +1087,9 @@ object HivemallOps { * @see hivemall.tools.array.SelectKBestUDF * @group tools.array */ - @scala.annotation.varargs - def select_k_best(exprs: Column*): Column = { + def select_k_best(X: Column, importanceList: Column, k: Column): Column = { HiveGenericUDF(new HiveFunctionWrapper( - "hivemall.tools.array.SelectKBestUDF"), exprs.map(_.expr)) + "hivemall.tools.array.SelectKBestUDF"), Seq(X.expr, importanceList.expr, k.expr)) } /** http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1347de98/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index e118257..cce22ce 100644 --- a/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-1.6/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.hive.HivemallOps._ import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Column, Row} import org.apache.spark.test.HivemallQueryTest import org.apache.spark.test.TestDoubleWrapper._ import org.apache.spark.test.TestUtils._ +import org.scalatest.Matchers._ final class HivemallOpsSuite extends HivemallQueryTest { @@ -188,18 +189,32 @@ final class HivemallOpsSuite extends HivemallQueryTest { test("ftvec.selection - chi2") { import hiveContext.implicits._ - - val df = Seq(Seq( - Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), - Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), - Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq( - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1") - - assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet === - Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503), - Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15))))) + implicit val doubleEquality = org.scalactic.TolerantNumerics.tolerantDoubleEquality(1e-5) + + // see also hivemall.ftvec.selection.ChiSquareUDFTest + val df = Seq( + Seq( + Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), + Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), + Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998) + ) -> Seq( + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))) + .toDF("arg0", "arg1") + + val result = df.select(chi2(df("arg0"), df("arg1"))).collect + result should have length 1 + val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0) + val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1) + + (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) + + (pVal, Seq(4.47651499e-03, 1.65754167e-01, 5.94344354e-26, 2.50017968e-15)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) } test("ftvec.conv - quantify") { @@ -352,13 +367,11 @@ final class HivemallOpsSuite extends HivemallQueryTest { test("tools.array - select_k_best") { import hiveContext.implicits._ - val data = Seq(Tuple1(Seq(0, 1, 3)), Tuple1(Seq(2, 4, 1)), Tuple1(Seq(5, 4, 9))) - val importance = Seq(3, 1, 2) - val k = 2 - val df = data.toDF("features") + val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) + val df = data.map(d => (d, Seq(3, 1, 2), 2)).toDF("features", "importance_list", "k") - assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq === - data.map(s => Row(Seq(s._1(0).toDouble, s._1(2).toDouble)))) + df.select(select_k_best(df("features"), df("importance_list"), df("k"))).collect shouldEqual + data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))) } test("misc - sigmoid") { @@ -560,7 +573,31 @@ final class HivemallOpsSuite extends HivemallQueryTest { test("user-defined aggregators for ftvec.selection") { import hiveContext.implicits._ + implicit val doubleEquality = org.scalactic.TolerantNumerics.tolerantDoubleEquality(1e-5) + + // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest + // binary class + // +-----------------+-------+ + // | features | class | + // +-----------------+-------+ + // | 5.1,3.5,1.4,0.2 | 0 | + // | 4.9,3.0,1.4,0.2 | 0 | + // | 4.7,3.2,1.3,0.2 | 0 | + // | 7.0,3.2,4.7,1.4 | 1 | + // | 6.4,3.2,4.5,1.5 | 1 | + // | 6.9,3.1,4.9,1.5 | 1 | + // +-----------------+-------+ + val df0 = Seq( + (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0)), + (1, Seq(4.7, 3.2, 1.3, 0.2), Seq(1, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1)), + (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1)), (1, Seq(6.9, 3.1, 4.9, 1.5), Seq(0, 1))) + .toDF("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect + (row0(0).getAs[Seq[Double]](1), Seq(4.38425236, 0.26390002, 15.83984511, 26.87005769)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) + // multiple class // +-----------------+-------+ // | features | class | // +-----------------+-------+ @@ -571,14 +608,15 @@ final class HivemallOpsSuite extends HivemallQueryTest { // | 6.3,3.3,6.0,2.5 | 2 | // | 5.8,2.7,5.1,1.9 | 2 | // +-----------------+-------+ - val df0 = Seq( + val df1 = Seq( (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)), (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1))) - .toDF.as("c0", "arg0", "arg1") - val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect - assert(row0(0).getAs[Seq[Double]](1) === - Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378)) + .toDF("c0", "arg0", "arg1") + val row1 = df1.groupby($"c0").snr("arg0", "arg1").collect + (row1(0).getAs[Seq[Double]](1), Seq(8.43181818, 1.32121212, 42.94949495, 33.80952381)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) } test("user-defined aggregators for tools.matrix") { @@ -586,8 +624,10 @@ final class HivemallOpsSuite extends HivemallQueryTest { // | 1 2 3 |T | 5 6 7 | // | 3 4 5 | * | 7 8 9 | - val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1") - val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect - assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0))) + val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))) + .toDF("c0", "arg0", "arg1") + + df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect() shouldEqual + Seq(Row(1, Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))) } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1347de98/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index f12992e..628c2ea 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -1252,10 +1252,10 @@ object HivemallOps { * @see hivemall.ftvec.selection.ChiSquareUDF * @group ftvec.selection */ - def chi2(exprs: Column*): Column = withExpr { + def chi2(observed: Column, expected: Column): Column = withExpr { HiveGenericUDF("chi2", new HiveFunctionWrapper("hivemall.ftvec.selection.ChiSquareUDF"), - exprs.map(_.expr)) + Seq(observed.expr, expected.expr)) } /** @@ -1341,10 +1341,10 @@ object HivemallOps { * @see hivemall.tools.array.SelectKBestUDF * @group tools.array */ - def select_k_best(exprs: Column*): Column = withExpr { + def select_k_best(X: Column, importanceList: Column, k: Column): Column = withExpr { HiveGenericUDF("select_k_best", new HiveFunctionWrapper("hivemall.tools.array.SelectKBestUDF"), - exprs.map(_.expr)) + Seq(X.expr, importanceList.expr, k.expr)) } /** http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1347de98/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index d750916..2e18280 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{AnalysisException, Column, Row} -import org.apache.spark.sql.functions import org.apache.spark.sql.hive.HivemallOps._ import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.types._ -import org.apache.spark.test.HivemallFeatureQueryTest +import org.apache.spark.sql.{AnalysisException, Column, Row, functions} import org.apache.spark.test.TestDoubleWrapper._ -import org.apache.spark.test.{TestUtils, VectorQueryTest} +import org.apache.spark.test.{HivemallFeatureQueryTest, TestUtils, VectorQueryTest} +import org.scalatest.Matchers._ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { @@ -189,18 +188,32 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("ftvec.selection - chi2") { import hiveContext.implicits._ + implicit val doubleEquality = org.scalactic.TolerantNumerics.tolerantDoubleEquality(1e-5) - val df = Seq(Seq( - Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), - Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), - Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)) -> Seq( - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), - Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))).toDF("arg0", "arg1") + // see also hivemall.ftvec.selection.ChiSquareUDFTest + val df = Seq( + Seq( + Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), + Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3), + Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998) + ) -> Seq( + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589), + Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))) + .toDF("arg0", "arg1") + + val result = df.select(chi2(df("arg0"), df("arg1"))).collect + result should have length 1 + val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0) + val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1) + + (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) - assert(df.select(chi2(df("arg0"), df("arg1"))).collect.toSet === - Set(Row(Row(Seq(10.817820878493995, 3.5944990176817315, 116.16984746363957, 67.24482558215503), - Seq(0.004476514990225833, 0.16575416718561453, 0d, 2.55351295663786e-15))))) + (pVal, Seq(4.47651499e-03, 1.65754167e-01, 5.94344354e-26, 2.50017968e-15)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) } test("ftvec.conv - quantify") { @@ -378,12 +391,10 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { import hiveContext.implicits._ val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) - val importance = Seq(3, 1, 2) - val k = 2 - val df = data.toDF("features") + val df = data.map(d => (d, Seq(3, 1, 2), 2)).toDF("features", "importance_list", "k") - assert(df.select(select_k_best(df("features"), importance, k)).collect.toSeq === - data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble)))) + df.select(select_k_best(df("features"), df("importance_list"), df("k"))).collect shouldEqual + data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))) } test("misc - sigmoid") { @@ -678,7 +689,31 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("user-defined aggregators for ftvec.selection") { import hiveContext.implicits._ + implicit val doubleEquality = org.scalactic.TolerantNumerics.tolerantDoubleEquality(1e-5) + // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest + // binary class + // +-----------------+-------+ + // | features | class | + // +-----------------+-------+ + // | 5.1,3.5,1.4,0.2 | 0 | + // | 4.9,3.0,1.4,0.2 | 0 | + // | 4.7,3.2,1.3,0.2 | 0 | + // | 7.0,3.2,4.7,1.4 | 1 | + // | 6.4,3.2,4.5,1.5 | 1 | + // | 6.9,3.1,4.9,1.5 | 1 | + // +-----------------+-------+ + val df0 = Seq( + (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0)), + (1, Seq(4.7, 3.2, 1.3, 0.2), Seq(1, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1)), + (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1)), (1, Seq(6.9, 3.1, 4.9, 1.5), Seq(0, 1))) + .toDF("c0", "arg0", "arg1") + val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect + (row0(0).getAs[Seq[Double]](1), Seq(4.38425236, 0.26390002, 15.83984511, 26.87005769)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) + + // multiple class // +-----------------+-------+ // | features | class | // +-----------------+-------+ @@ -689,14 +724,15 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { // | 6.3,3.3,6.0,2.5 | 2 | // | 5.8,2.7,5.1,1.9 | 2 | // +-----------------+-------+ - val df0 = Seq( + val df1 = Seq( (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1, 0)), (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), Seq(0, 0, 1))) - .toDF.as("c0", "arg0", "arg1") - val row0 = df0.groupby($"c0").snr("arg0", "arg1").collect - assert(row0(0).getAs[Seq[Double]](1) === - Seq(8.431818181818192, 1.3212121212121217, 42.94949494949499, 33.80952380952378)) + .toDF("c0", "arg0", "arg1") + val row1 = df1.groupby($"c0").snr("arg0", "arg1").collect + (row1(0).getAs[Seq[Double]](1), Seq(8.43181818, 1.32121212, 42.94949495, 33.80952381)) + .zipped + .foreach((actual, expected) => actual shouldEqual expected) } test("user-defined aggregators for tools.matrix") { @@ -705,8 +741,9 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { // | 1 2 3 |T | 5 6 7 | // | 3 4 5 | * | 7 8 9 | val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 9))).toDF.as("c0", "arg0", "arg1") - val row0 = df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect - assert(row0(0).getAs[Seq[Double]](1) === Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0))) + + df0.groupby($"c0").transpose_and_dot("arg0", "arg1").collect() shouldEqual + Seq(Row(1, Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 58.0, 66.0)))) } }
