Repository: incubator-hivemall Updated Branches: refs/heads/master b90999664 -> 79f92f4f2
Close #26: [HIVEMALL-35] Remove unnecessary implicit conversions in HivemallUtils Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/79f92f4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/79f92f4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/79f92f4f Branch: refs/heads/master Commit: 79f92f4f2c6458ccf4133744a1654a46cbbb56cc Parents: b909996 Author: Takeshi YAMAMURO <[email protected]> Authored: Thu Jan 26 17:58:27 2017 +0900 Committer: Takeshi YAMAMURO <[email protected]> Committed: Thu Jan 26 17:58:27 2017 +0900 ---------------------------------------------------------------------- .../knn/similarity/Distance2SimilarityUDF.java | 2 +- .../hivemall/tools/RegressionDatagen.scala | 12 +- .../org/apache/spark/sql/hive/HivemallOps.scala | 16 +- .../apache/spark/sql/hive/HivemallUtils.scala | 109 +++---- .../apache/spark/sql/hive/HiveUdfSuite.scala | 2 +- .../spark/sql/hive/HivemallOpsSuite.scala | 317 +++++++++---------- .../spark/sql/hive/ModelMixingSuite.scala | 29 +- .../apache/spark/sql/hive/XGBoostSuite.scala | 7 +- .../sql/hive/benchmark/MiscBenchmark.scala | 13 +- .../hive/test/HivemallFeatureQueryTest.scala | 112 +++++++ .../streaming/HivemallFeatureOpsSuite.scala | 4 +- .../spark/test/HivemallFeatureQueryTest.scala | 101 ------ .../scala/org/apache/spark/test/TestUtils.scala | 12 +- 13 files changed, 359 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java b/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java index 43c42a7..9186b80 100644 --- a/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java +++ b/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java @@ -50,7 +50,7 @@ public final class Distance2SimilarityUDF extends GenericUDF { @Override public FloatWritable evaluate(DeferredObject[] arguments) throws HiveException { - float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0], distanceOI); + float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0].get(), distanceOI); float sim = 1.f / (1.f + d); return new FloatWritable(sim); } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala b/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala index 01664f4..72a5c83 100644 --- a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala +++ b/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala @@ -19,8 +19,8 @@ package hivemall.tools import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.types._ object RegressionDatagen { @@ -57,10 +57,10 @@ object RegressionDatagen { ) import sc.implicits._ df.lr_datagen( - s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one" - + (if (dense) " -dense" else "") - + (if (sort) " -sort" else "") - + (if (cl) " -cl" else "")) - .select($"label".cast(DoubleType).as("label"), $"features") + lit(s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims -prob_one $prob_one" + + (if (dense) " -dense" else "") + + (if (sort) " -sort" else "") + + (if (cl) " -cl" else "")) + ).select($"label".cast(DoubleType).as("label"), $"features") } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index f233a2a..8fa4831 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive import java.util.UUID -import scala.collection.JavaConverters._ - import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.HivemallFeature @@ -57,8 +55,6 @@ import org.apache.spark.unsafe.types.UTF8String * @groupname misc */ final class HivemallOps(df: DataFrame) extends Logging { - import HivemallOps._ - import HivemallUtils._ /** * @see hivemall.regression.AdaDeltaUDTF @@ -687,10 +683,14 @@ final class HivemallOps(df: DataFrame) extends Logging { * Amplifies and shuffle data inside partitions. * @group ftvec.amplify */ - def part_amplify(xtimes: Int): DataFrame = { + def part_amplify(xtimes: Column): DataFrame = { + val xtimesInt = xtimes.expr match { + case Literal(v: Any, IntegerType) => v.asInstanceOf[Int] + case e => throw new AnalysisException("`xtimes` must be integer, however " + e) + } val rdd = df.rdd.mapPartitions({ iter => val elems = iter.flatMap{ row => - Seq.fill[Row](xtimes)(row) + Seq.fill[Row](xtimesInt)(row) } // Need to check how this shuffling affects results scala.util.Random.shuffle(elems) @@ -792,7 +792,7 @@ final class HivemallOps(df: DataFrame) extends Logging { */ def each_top_k(k: Int, group: String, score: String, args: String*) : DataFrame = withTypedPlan { - val clusterDf = df.repartition(group).sortWithinPartitions(group) + val clusterDf = df.repartition(df(group)).sortWithinPartitions(group) val childrenAttributes = clusterDf.logicalPlan.output val generator = Generate( EachTopK( @@ -881,7 +881,7 @@ final class HivemallOps(df: DataFrame) extends Logging { @inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = { df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map { - case (StructField(_, _: VectorUDT, _, _), c) => to_hivemall_features(c) + case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c) case (_, c) => c } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala index b7b7071..056d6d6 100644 --- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala +++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.sql.{Column, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -31,19 +30,40 @@ object HivemallUtils { private[this] val maxDims = 100000000 /** - * An implicit conversion to avoid doing annoying transformation. - * This class must be in o.a.spark.sql._ because - * a Column class is private. + * Check whether the given schema contains a column of the required data type. + * @param colName column name + * @param dataType required column data type */ - @inline implicit def toBooleanLiteral(i: Boolean): Column = Column(Literal.create(i, BooleanType)) - @inline implicit def toIntLiteral(i: Int): Column = Column(Literal.create(i, IntegerType)) - @inline implicit def toFloatLiteral(i: Float): Column = Column(Literal.create(i, FloatType)) - @inline implicit def toDoubleLiteral(i: Double): Column = Column(Literal.create(i, DoubleType)) - @inline implicit def toStringLiteral(i: String): Column = Column(Literal.create(i, StringType)) - @inline implicit def toIntArrayLiteral(i: Seq[Int]): Column = - Column(Literal.create(i, ArrayType(IntegerType))) - @inline implicit def toStringArrayLiteral(i: Seq[String]): Column = - Column(Literal.create(i, ArrayType(StringType))) + private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType) + : Unit = { + val actualDataType = schema(colName).dataType + require(actualDataType.equals(dataType), + s"Column $colName must be of type $dataType but was actually $actualDataType.") + } + + def to_vector_func(dense: Boolean, dims: Int): Seq[String] => Vector = { + if (dense) { + // Dense features + i: Seq[String] => { + val features = new Array[Double](dims) + i.map { ft => + val s = ft.split(":").ensuring(_.size == 2) + features(s(0).toInt) = s(1).toDouble + } + Vectors.dense(features) + } + } else { + // Sparse features + i: Seq[String] => { + val features = i.map { ft => + // val s = ft.split(":").ensuring(_.size == 2) + val s = ft.split(":") + (s(0).toInt, s(1).toDouble) + } + Vectors.sparse(dims, features) + } + } + } def to_hivemall_features_func(): Vector => Array[String] = { case dv: DenseVector => @@ -83,20 +103,28 @@ object HivemallUtils { } /** - * Transforms `org.apache.spark.ml.linalg.Vector` into Hivemall features. + * Transforms Hivemall features into a [[Vector]]. + */ + def to_vector(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = { + udf(to_vector_func(dense, dims)) + } + + /** + * Transforms a [[Vector]] into Hivemall features. */ def to_hivemall_features: UserDefinedFunction = udf(to_hivemall_features_func) /** - * Returns a new vector with `1.0` (bias) appended to the input vector. + * Returns a new [[Vector]] with `1.0` (bias) appended to the input [[Vector]]. * @group ftvec */ def append_bias: UserDefinedFunction = udf(append_bias_func) /** - * Make up a function object from a Hivemall model. + * Builds a [[Vector]]-based model from a table of Hivemall models */ - def funcModel(df: DataFrame, dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = { + def vectorized_model(df: DataFrame, dense: Boolean = false, dims: Int = maxDims) + : UserDefinedFunction = { checkColumnType(df.schema, "feature", StringType) checkColumnType(df.schema, "weight", DoubleType) @@ -106,7 +134,7 @@ object HivemallUtils { .select($"weight") .map { case Row(weight: Double) => weight} .reduce(_ + _) - val weights = funcVectorizerImpl(dense, dims)( + val weights = to_vector_func(dense, dims)( df.select($"feature", $"weight") .where($"feature" !== "0") .map { case Row(label: String, feature: Double) => s"${label}:$feature"} @@ -114,47 +142,4 @@ object HivemallUtils { udf((input: Vector) => BLAS.dot(input, weights) + intercept) } - - /** - * Make up a function object to transform Hivemall features into Vector. - */ - def funcVectorizer(dense: Boolean = false, dims: Int = maxDims): UserDefinedFunction = { - udf(funcVectorizerImpl(dense, dims)) - } - - private[this] def funcVectorizerImpl(dense: Boolean, dims: Int): Seq[String] => Vector = { - if (dense) { - // Dense features - i: Seq[String] => { - val features = new Array[Double](dims) - i.map { ft => - val s = ft.split(":").ensuring(_.size == 2) - features(s(0).toInt) = s(1).toDouble - } - Vectors.dense(features) - } - } else { - // Sparse features - i: Seq[String] => { - val features = i.map { ft => - // val s = ft.split(":").ensuring(_.size == 2) - val s = ft.split(":") - (s(0).toInt, s(1).toDouble) - } - Vectors.sparse(dims, features) - } - } - } - - /** - * Check whether the given schema contains a column of the required data type. - * @param colName column name - * @param dataType required column data type - */ - private[this] def checkColumnType(schema: StructType, colName: String, dataType: DataType) - : Unit = { - val actualDataType = schema(colName).dataType - require(actualDataType.equals(dataType), - s"Column $colName must be of type $dataType but was actually $actualDataType.") - } } http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala index d53ef73..74a138e 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HivemallUtils._ -import org.apache.spark.test.HivemallFeatureQueryTest +import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest import org.apache.spark.test.VectorQueryTest final class HiveUdfWithFeatureSuite extends HivemallFeatureQueryTest { http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 6f2f016..61af8d1 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -18,127 +18,112 @@ */ package org.apache.spark.sql.hive -import org.apache.spark.sql.{AnalysisException, Column, Row} -import org.apache.spark.sql.functions +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HivemallGroupedDataset._ import org.apache.spark.sql.hive.HivemallOps._ import org.apache.spark.sql.hive.HivemallUtils._ +import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest import org.apache.spark.sql.types._ -import org.apache.spark.test.{HivemallFeatureQueryTest, TestUtils, VectorQueryTest} -import org.apache.spark.test.TestDoubleWrapper._ +import org.apache.spark.test.{TestUtils, VectorQueryTest} +import org.apache.spark.test.TestFPWrapper._ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("knn.similarity") { - val row1 = DummyInputData.select(cosine_sim(Seq(1, 2, 3, 4), Seq(3, 4, 5, 6))).collect - assert(row1(0).getFloat(0) ~== 0.500f) + val df1 = DummyInputData.select(cosine_sim(lit2(Seq(1, 2, 3, 4)), lit2(Seq(3, 4, 5, 6)))) + assert(df1.collect.apply(0).getFloat(0) ~== 0.500f) - val row2 = DummyInputData.select(jaccard(5, 6)).collect - assert(row2(0).getFloat(0) ~== 0.96875f) + val df2 = DummyInputData.select(jaccard(lit(5), lit(6))) + assert(df2.collect.apply(0).getFloat(0) ~== 0.96875f) - val row3 = DummyInputData.select(angular_similarity(Seq(1, 2, 3), Seq(4, 5, 6))).collect - assert(row3(0).getFloat(0) ~== 0.500f) + val df3 = DummyInputData.select(angular_similarity(lit2(Seq(1, 2, 3)), lit2(Seq(4, 5, 6)))) + assert(df3.collect.apply(0).getFloat(0) ~== 0.500f) - val row4 = DummyInputData.select(euclid_similarity(Seq(5, 3, 1), Seq(2, 8, 3))).collect - assert(row4(0).getFloat(0) ~== 0.33333334f) + val df4 = DummyInputData.select(euclid_similarity(lit2(Seq(5, 3, 1)), lit2(Seq(2, 8, 3)))) + assert(df4.collect.apply(0).getFloat(0) ~== 0.33333334f) - // TODO: $"c0" throws AnalysisException, why? - // val row5 = DummyInputData.select(distance2similarity(DummyInputData("c0"))).collect - // assert(row5(0).getFloat(0) ~== 0.1f) + val df5 = DummyInputData.select(distance2similarity(lit(1.0))) + assert(df5.collect.apply(0).getFloat(0) ~== 0.5f) } test("knn.distance") { - val row1 = DummyInputData.select(hamming_distance(1, 3)).collect - assert(row1(0).getInt(0) == 1) + val df1 = DummyInputData.select(hamming_distance(lit(1), lit(3))) + checkAnswer(df1, Row(1) :: Nil) - val row2 = DummyInputData.select(popcnt(1)).collect - assert(row2(0).getInt(0) == 1) + val df2 = DummyInputData.select(popcnt(lit(1))) + checkAnswer(df2, Row(1) :: Nil) - val row3 = DummyInputData.select(kld(0.1, 0.5, 0.2, 0.5)).collect - assert(row3(0).getDouble(0) ~== 0.01) + val df3 = DummyInputData.select(kld(lit(0.1), lit(0.5), lit(0.2), lit(0.5))) + assert(df3.collect.apply(0).getDouble(0) ~== 0.01) - val row4 = DummyInputData.select( - euclid_distance(Seq("0.1", "0.5"), Seq("0.2", "0.5"))).collect - assert(row4(0).getFloat(0) ~== 1.4142135f) + val df4 = DummyInputData.select( + euclid_distance(lit2(Seq("0.1", "0.5")), lit2(Seq("0.2", "0.5")))) + assert(df4.collect.apply(0).getFloat(0) ~== 1.4142135f) - val row5 = DummyInputData.select( - cosine_distance(Seq("0.8", "0.3"), Seq("0.4", "0.6"))).collect - assert(row5(0).getFloat(0) ~== 1.0f) + val df5 = DummyInputData.select( + cosine_distance(lit2(Seq("0.8", "0.3")), lit2(Seq("0.4", "0.6")))) + assert(df5.collect.apply(0).getFloat(0) ~== 1.0f) - val row6 = DummyInputData.select( - angular_distance(Seq("0.1", "0.1"), Seq("0.3", "0.8"))).collect - assert(row6(0).getFloat(0) ~== 0.50f) + val df6 = DummyInputData.select( + angular_distance(lit2(Seq("0.1", "0.1")), lit2(Seq("0.3", "0.8")))) + assert(df6.collect.apply(0).getFloat(0) ~== 0.50f) - val row7 = DummyInputData.select( - manhattan_distance(Seq("0.7", "0.8"), Seq("0.5", "0.6"))).collect - assert(row7(0).getFloat(0) ~== 4.0f) + val df7 = DummyInputData.select( + manhattan_distance(lit2(Seq("0.7", "0.8")), lit2(Seq("0.5", "0.6")))) + assert(df7.collect.apply(0).getFloat(0) ~== 4.0f) - val row8 = DummyInputData.select( - minkowski_distance(Seq("0.1", "0.2"), Seq("0.2", "0.2"), 1.0)).collect - assert(row8(0).getFloat(0) ~== 2.0f) + val df8 = DummyInputData.select( + minkowski_distance(lit2(Seq("0.1", "0.2")), lit2(Seq("0.2", "0.2")), lit2(1.0))) + assert(df8.collect.apply(0).getFloat(0) ~== 2.0f) } test("knn.lsh") { import hiveContext.implicits._ - assert(IntList2Data.minhash(1, $"target").count() > 0) + assert(IntList2Data.minhash(lit(1), $"target").count() > 0) - assert(DummyInputData.select(bbit_minhash(Seq("1:0.1", "2:0.5"), false)).count + assert(DummyInputData.select(bbit_minhash(lit2(Seq("1:0.1", "2:0.5")), lit(false))).count == DummyInputData.count) - assert(DummyInputData.select(minhashes(Seq("1:0.1", "2:0.5"), false)).count + assert(DummyInputData.select(minhashes(lit2(Seq("1:0.1", "2:0.5")), lit(false))).count == DummyInputData.count) } test("ftvec - add_bias") { - // TODO: This import does not work and why? - // import hiveContext.implicits._ - assert(TinyTrainData.select(add_bias(TinyTrainData.col("features"))).collect.toSet - === Set( - Row(Seq("1:0.8", "2:0.2", "0:1.0")), - Row(Seq("2:0.7", "0:1.0")), - Row(Seq("1:0.9", "0:1.0")))) + import hiveContext.implicits._ + checkAnswer(TinyTrainData.select(add_bias($"features")), + Row(Seq("1:0.8", "2:0.2", "0:1.0")) :: + Row(Seq("2:0.7", "0:1.0")) :: + Row(Seq("1:0.9", "0:1.0")) :: + Nil + ) } test("ftvec - extract_feature") { - val row = DummyInputData.select(extract_feature("1:0.8")).collect - assert(row(0).getString(0) == "1") + val df = DummyInputData.select(extract_feature(lit("1:0.8"))) + checkAnswer(df, Row("1") :: Nil) } test("ftvec - extract_weight") { - val row = DummyInputData.select(extract_weight("3:0.1")).collect - assert(row(0).getDouble(0) ~== 0.1) + val df = DummyInputData.select(extract_weight(lit("3:0.1"))) + assert(df.collect.apply(0).getDouble(0) ~== 0.1) } - // test("ftvec - explode_array") { - // import hiveContext.implicits._ - // assert(TinyTrainData.explode_array("features") - // .select($"feature").collect.toSet - // === Set(Row("1:0.8"), Row("2:0.2"), Row("2:0.7"), Row("1:0.9"))) - // } + test("ftvec - explode_array") { + import hiveContext.implicits._ + val df = TinyTrainData.explode_array($"features").select($"feature") + checkAnswer(df, Row("1:0.8") :: Row("2:0.2") :: Row("2:0.7") :: Row("1:0.9") :: Nil) + } test("ftvec - add_feature_index") { - // import hiveContext.implicits._ - val doubleListData = { - // TODO: Use `toDF` - val rowRdd = hiveContext.sparkContext.parallelize( - Row(0.8 :: 0.5 :: Nil) :: - Row(0.3 :: 0.1 :: Nil) :: - Row(0.2 :: Nil) :: - Nil - ) - hiveContext.createDataFrame( - rowRdd, - StructType( - StructField("data", ArrayType(DoubleType), true) :: - Nil) - ) - } - - assert(doubleListData.select( - add_feature_index(doubleListData.col("data"))).collect.toSet - === Set( - Row(Seq("1:0.8", "2:0.5")), - Row(Seq("1:0.3", "2:0.1")), - Row(Seq("1:0.2")))) + import hiveContext.implicits._ + val doubleListData = Seq(Array(0.8, 0.5), Array(0.3, 0.1), Array(0.2)).toDF("data") + checkAnswer( + doubleListData.select(add_feature_index($"data")), + Row(Seq("1:0.8", "2:0.5")) :: + Row(Seq("1:0.3", "2:0.1")) :: + Row(Seq("1:0.2")) :: + Nil + ) } test("ftvec - sort_by_feature") { @@ -158,40 +143,46 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { Nil) ) } - val sortedKeys = intFloatMapData.select(sort_by_feature(intFloatMapData.col("data"))) .collect.map { case Row(m: Map[Int, Float]) => m.keysIterator.toSeq } - assert(sortedKeys.toSet === Set(Seq(1, 2, 3), Seq(1, 2), Seq(1, 2, 3, 4))) } test("ftvec.hash") { - assert(DummyInputData.select(mhash("test")).count == DummyInputData.count) - assert(DummyInputData.select(sha1("test")).count == DummyInputData.count) - // assert(DummyInputData.select(array_hash_values(Seq("aaa", "bbb"))).count + assert(DummyInputData.select(mhash(lit("test"))).count == DummyInputData.count) + assert(DummyInputData.select(org.apache.spark.sql.hive.HivemallOps.sha1(lit("test"))).count == + DummyInputData.count) + // TODO: The tests below failed because: + // org.apache.spark.sql.AnalysisException: List type in java is unsupported because JVM type + // erasure makes spark fail to catch a component type in List<>; + // + // assert(DummyInputData.select(array_hash_values(lit2(Seq("aaa", "bbb")))).count // == DummyInputData.count) - // assert(DummyInputData.select(prefixed_hash_values(Seq("ccc", "ddd"), "prefix")).count + // assert(DummyInputData.select( + // prefixed_hash_values(lit2(Seq("ccc", "ddd")), lit("prefix"))).count // == DummyInputData.count) } test("ftvec.scaling") { - assert(TinyTrainData.select(rescale(2.0f, 1.0, 5.0)).collect.toSet - === Set(Row(0.25f))) - assert(TinyTrainData.select(zscore(1.0f, 0.5, 0.5)).collect.toSet - === Set(Row(1.0f))) - assert(TinyTrainData.select(normalize(TinyTrainData.col("features"))).collect.toSet - === Set( - Row(Seq("1:0.9701425", "2:0.24253562")), - Row(Seq("2:1.0")), - Row(Seq("1:1.0")))) + val df1 = TinyTrainData.select(rescale(lit(2.0f), lit(1.0), lit(5.0))) + assert(df1.collect.apply(0).getFloat(0) === 0.25f) + val df2 = TinyTrainData.select(zscore(lit(1.0f), lit(0.5), lit(0.5))) + assert(df2.collect.apply(0).getFloat(0) === 1.0f) + val df3 = TinyTrainData.select(normalize(TinyTrainData.col("features"))) + checkAnswer( + df3, + Row(Seq("1:0.9701425", "2:0.24253562")) :: + Row(Seq("2:1.0")) :: + Row(Seq("1:1.0")) :: + Nil) } test("ftvec.selection - chi2") { import hiveContext.implicits._ - // see also hivemall.ftvec.selection.ChiSquareUDFTest + // See also hivemall.ftvec.selection.ChiSquareUDFTest val df = Seq( Seq( Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996), @@ -220,88 +211,85 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("ftvec.conv - quantify") { import hiveContext.implicits._ val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", false)).toDF - // This test is done in a single parition because `HivemallOps#quantify` assigns indentifiers + // This test is done in a single partition because `HivemallOps#quantify` assigns identifiers // for non-numerical values in each partition. - assert(testDf.coalesce(1).quantify(Seq[Column](true) ++ testDf.cols: _*).collect.toSet - === Set(Row(1, 0, 0), Row(2, 1, 1), Row(3, 0, 1))) + checkAnswer( + testDf.coalesce(1).quantify(lit(true) +: testDf.cols: _*), + Row(1, 0, 0) :: Row(2, 1, 1) :: Row(3, 0, 1) :: Nil) } test("ftvec.amplify") { import hiveContext.implicits._ - assert(TinyTrainData.amplify(3, $"label", $"features").count() == 9) - // assert(TinyTrainData.rand_amplify(3, "-buf 128", $"label", $"features").count() == 9) - assert(TinyTrainData.part_amplify(3).count() == 9) + assert(TinyTrainData.amplify(lit(3), $"label", $"features").count() == 9) + assert(TinyTrainData.part_amplify(lit(3)).count() == 9) + // TODO: The test below failed because: + // java.lang.RuntimeException: Unsupported literal type class scala.Tuple3 + // (-buf 128,label,features) + // + // assert(TinyTrainData.rand_amplify(lit(3), lit("-buf 8", $"label", $"features")).count() == 9) } ignore("ftvec.conv") { import hiveContext.implicits._ val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: Nil)).toDF("a", "b") - assert(df1.select(to_dense_features(df1("b"), 3)).collect.toSet - === Set(Row(Array(0.1f, 0.0f, 0.3f)), Array(0.0f, 0.2f, 0.0f))) - + checkAnswer( + df1.select(to_dense_features(df1("b"), lit(3))), + Row(Array(0.1f, 0.0f, 0.3f)) :: Row(Array(0.0f, 0.2f, 0.0f)) :: Nil + ) val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c") - assert(df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))).collect.toSet - === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4")))) + checkAnswer( + df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))), + Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", "3:0.4")) :: Nil + ) } test("ftvec.trans") { import hiveContext.implicits._ val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c") - assert(df1.binarize_label($"a", $"b", $"c").collect.toSet === Set(Row(1, 1))) + checkAnswer( + df1.binarize_label($"a", $"b", $"c"), + Row(1, 1) :: Row(1, 1) :: Row(1, 1) :: Nil + ) val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b") - assert(df2.select(vectorize_features(Seq("a", "b"), df2("a"), df2("b"))).collect.toSet - === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3")))) + checkAnswer( + df2.select(vectorize_features(lit2(Seq("a", "b")), df2("a"), df2("b"))), + Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil + ) val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b") - assert(df3.select(categorical_features(Seq("a", "b"), df3("a"), df3("b"))).collect.toSet - === Set(Row(Seq("a#c11", "b#c12")), Row(Seq("a#c21", "b#c22")))) + checkAnswer( + df3.select(categorical_features(lit2(Seq("a", "b")), df3("a"), df3("b"))), + Row(Seq("a#c11", "b#c12")) :: Row(Seq("a#c21", "b#c22")) :: Nil + ) val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c") - assert(df4.select(indexed_features(df4("a"), df4("b"), df4("c"))).collect.toSet - === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", "3:0.4")))) + checkAnswer( + df4.select(indexed_features(df4("a"), df4("b"), df4("c"))), + Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", "3:0.4")) :: Nil + ) - val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c") - assert(df5.coalesce(1).quantified_features(true, df5("a"), df5("b"), df5("c")).collect.toSet - === Set(Row(Seq(0.0, 0.0, 0.0)), Row(Seq(1.0, 0.0, 1.0)))) + val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c").coalesce(1) + checkAnswer( + df5.quantified_features(lit(true), df5("a"), df5("b"), df5("c")), + Row(Seq(0.0, 0.0, 0.0)) :: Row(Seq(1.0, 0.0, 1.0)) :: Nil + ) val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b") - assert(df6.select(quantitative_features(Seq("a", "b"), df6("a"), df6("b"))).collect.toSet - === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3")))) + checkAnswer( + df6.select(quantitative_features(lit2(Seq("a", "b")), df6("a"), df6("b"))), + Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil + ) } test("misc - hivemall_version") { - assert(DummyInputData.select(hivemall_version()).collect.toSet === Set(Row("0.4.2-rc.2"))) - /** - * TODO: Why a test below does fail? - * - * checkAnswer( - * DummyInputData.select(hivemall_version()).distinct, - * Row("0.3.1") - * ) - * - * The test throw an exception below: - * - * - hivemall_version *** FAILED *** - * org.apache.spark.sql.AnalysisException: - * Cannot resolve column name "HiveSimpleUDF#hivemall.HivemallVersionUDF()" among - * (HiveSimpleUDF#hivemall.Hivemall VersionUDF()); - * at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159) - * at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159) - * at scala.Option.getOrElse(Option.scala:120) - * at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158) - * at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227) - * at org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227) - * at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) - * ... - */ + checkAnswer(DummyInputData.select(hivemall_version()), Row("0.4.2-rc.2")) } test("misc - rowid") { - val df = DummyInputData.select(rowid()) - assert(df.distinct.count == df.count) + assert(DummyInputData.select(rowid()).distinct.count == DummyInputData.count) } test("misc - each_top_k") { @@ -324,7 +312,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { Nil ) checkAnswer( - testDf.each_top_k(1, $"key", $"score", $"key", $"value"), + testDf.each_top_k(lit(1), $"key", $"score", $"key", $"value"), Row(1, "a", "3") :: Row(1, "b", "4") :: Row(1, "c", "6") :: @@ -340,7 +328,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { Nil ) checkAnswer( - testDf.each_top_k(-1, $"key", $"score", $"key", $"value"), + testDf.each_top_k(lit(-1), $"key", $"score", $"key", $"value"), Row(1, "a", "1") :: Row(1, "b", "5") :: Row(1, "c", "6") :: @@ -348,7 +336,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { ) // Check if some exceptions thrown in case of some conditions - assert(intercept[AnalysisException] { testDf.each_top_k(0.1, $"key", $"score") } + assert(intercept[AnalysisException] { testDf.each_top_k(lit(0.1), $"key", $"score") } .getMessage contains "`k` must be integer, however") assert(intercept[AnalysisException] { testDf.each_top_k(1, "key", "data") } .getMessage contains "must have a comparable type") @@ -369,7 +357,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil)) .toDF("label", "features") - .train_randomforest_regr($"features", $"label", "-trees 2") + .train_randomforest_regr($"features", $"label") val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: Nil)) .toDF("label", "features") @@ -379,9 +367,8 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { .join(testData).coalesce(1) .select( $"rowid", - tree_predict( - model("model_id"), model("model_type"), model("pred_model"), testData("features"), true) - .as("predicted") + tree_predict(model("model_id"), model("model_type"), model("pred_model"), + testData("features"), lit(true)).as("predicted") ) .groupBy($"rowid") .rf_ensemble("predicted").as("rowid", "predicted") @@ -392,38 +379,24 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("tools.array - select_k_best") { import hiveContext.implicits._ - import org.apache.spark.sql.functions._ val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", "importance_list") val k = 2 - checkAnswer(df.select(select_k_best(df("features"), df("importance_list"), lit(k))), - data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble)))) + checkAnswer( + df.select(select_k_best(df("features"), df("importance_list"), lit(k))), + Row(Seq(0.0, 3.0)) :: Row(Seq(2.0, 1.0)) :: Row(Seq(5.0, 9.0)) :: Nil + ) } test("misc - sigmoid") { import hiveContext.implicits._ - /** - * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0? - * This test throws an exception below: - * - * org.apache.spark.sql.catalyst.analysis.UnresolvedException: - * Invalid call to dataType on unresolved object, tree: 'data - * at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType - * (unresolved.scala:59) - * at org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$method$1.apply(hiveUDFs.scala:119) - * ... - */ - val rows = DummyInputData.select(sigmoid($"c0")).collect - assert(rows(0).getDouble(0) ~== 0.500) - assert(rows(1).getDouble(0) ~== 0.731) - assert(rows(2).getDouble(0) ~== 0.880) - assert(rows(3).getDouble(0) ~== 0.952) + assert(DummyInputData.select(sigmoid($"c0")).collect.apply(0).getDouble(0) ~== 0.500) } test("misc - lr_datagen") { - assert(TinyTrainData.lr_datagen("-n_examples 100 -n_features 10 -seed 100").count >= 100) + assert(TinyTrainData.lr_datagen(lit("-n_examples 100 -n_features 10 -seed 100")).count >= 100) } test("invoke regression functions") { @@ -577,7 +550,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { * WARN Column: Constructing trivially true equals predicate, 'rowid#1323 = rowid#1323'. * Perhaps you need to use aliases. */ - .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)) + .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 1.0).otherwise(0.0)) .as("rowid", "predicted") // Evaluation http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala index 5a58c2d..3b87a96 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala @@ -33,10 +33,9 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.HivemallLabeledPoint import org.apache.spark.sql.{Column, DataFrame, Row} -import org.apache.spark.sql.functions.when +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HivemallGroupedDataset._ import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.test.TestUtils @@ -156,9 +155,16 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { // Build a model val model = { val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}" - val res = TestUtils.invokeFunc(new HivemallOps(trainA9aData.part_amplify(1)), func, - Seq[Column](add_bias($"features"), $"label", - s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel")) + val res = TestUtils.invokeFunc( + new HivemallOps(trainA9aData.part_amplify(lit(1))), + func, + Seq[Column]( + add_bias($"features"), + $"label", + lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 " + + "-mix_cancel") + ) + ) if (!res.columns.contains("conv")) { res.groupBy("feature").agg("weight" -> "avg") } else { @@ -217,9 +223,16 @@ final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter { // Build a model val model = { val groupId = s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}" - val res = TestUtils.invokeFunc(new HivemallOps(trainKdd2010aData.part_amplify(1)), func, - Seq[Column](add_bias($"features"), $"label", - s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 -mix_cancel")) + val res = TestUtils.invokeFunc( + new HivemallOps(trainKdd2010aData.part_amplify(lit(1))), + func, + Seq[Column]( + add_bias($"features"), + $"label", + lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} -mix_threshold 2 " + + "-mix_cancel") + ) + ) if (!res.columns.contains("conv")) { res.groupBy("feature").agg("weight" -> "avg") } else { http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala index 136d6bc..7c78678 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HivemallGroupedDataset._ import org.apache.spark.sql.hive.HivemallOps._ -import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.types._ import org.apache.spark.test.VectorQueryTest @@ -56,7 +55,7 @@ final class XGBoostSuite extends VectorQueryTest { // Save built models in persistent storage mllibTrainDf.repartition(numModles) - .train_xgboost_regr($"features", $"label", s"${defaultOptions}") + .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}")) .write.format(xgboost).save(tempDir) // Check #models generated by XGBoost @@ -82,7 +81,7 @@ final class XGBoostSuite extends VectorQueryTest { withTempModelDir { tempDir => mllibTrainDf.repartition(numModles) - .train_xgboost_regr($"features", $"label", s"${defaultOptions}") + .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}")) .write.format(xgboost).save(tempDir) // Check #models generated by XGBoost @@ -110,7 +109,7 @@ final class XGBoostSuite extends VectorQueryTest { mllibTrainDf.repartition(numModles) .train_xgboost_multiclass_classifier( - $"features", $"label", s"${defaultOptions.set("num_class", "2")}") + $"features", $"label", lit(s"${defaultOptions.set("num_class", "2")}")) .write.format(xgboost).save(tempDir) // Check #models generated by XGBoost http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala index a16ebb8..87782b9 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala @@ -25,9 +25,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project} import org.apache.spark.sql.expressions.Window -import org.apache.spark.sql.functions +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveGenericUDF, HiveGenericUDTF} -import org.apache.spark.sql.hive.HivemallUtils._ import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.types._ import org.apache.spark.test.TestUtils @@ -49,7 +48,7 @@ class TestFuncWrapper(df: DataFrame) { def each_top_k_improved(k: Int, group: String, score: String, args: String*) : DataFrame = withTypedPlan { - val clusterDf = df.repartition(group).sortWithinPartitions(group) + val clusterDf = df.repartition(df(group)).sortWithinPartitions(group) val childrenAttributes = clusterDf.logicalPlan.output val generator = Generate( EachTopK( @@ -139,7 +138,7 @@ class MiscBenchmark extends SparkFunSuite { def sigmoidExprs(expr: Column): Column = { val one: () => Literal = () => Literal.create(1.0, DoubleType) - Column(one()) / (Column(one()) + functions.exp(-expr)) + Column(one()) / (Column(one()) + exp(-expr)) } addBenchmarkCase( "exprs", @@ -153,7 +152,7 @@ class MiscBenchmark extends SparkFunSuite { }(RowEncoder(schema)) ) - val sigmoidUdf = functions.udf((d: Double) => 1.0 / (1.0 + Math.exp(-d))) + val sigmoidUdf = udf { (d: Double) => 1.0 / (1.0 + Math.exp(-d)) } addBenchmarkCase( "spark-udf", testDf.select(sigmoidUdf($"value")) @@ -201,7 +200,7 @@ class MiscBenchmark extends SparkFunSuite { addBenchmarkCase( "rank", testDf.withColumn( - "rank", functions.rank().over(Window.partitionBy($"key").orderBy($"score".desc)) + "rank", rank().over(Window.partitionBy($"key").orderBy($"score".desc)) ).where($"rank" <= topK) ) @@ -212,7 +211,7 @@ class MiscBenchmark extends SparkFunSuite { // org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to name // on unresolved object, tree: unresolvedalias('value, None) // at org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.name(unresolved.scala:339) - testDf.each_top_k(topK, $"key", $"score", testDf("value")) + testDf.each_top_k(lit(topK), $"key", $"score", testDf("value")) ) addBenchmarkCase( http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala new file mode 100644 index 0000000..a4733f5 --- /dev/null +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.hive.test + +import scala.collection.mutable.Seq +import scala.reflect.runtime.universe.TypeTag + +import hivemall.tools.RegressionDatagen + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.QueryTest + +/** + * Base class for tests with Hivemall features. + */ +abstract class HivemallFeatureQueryTest extends QueryTest with TestHiveSingleton { + + import hiveContext.implicits._ + + /** + * TODO: spark-2.0 does not support literals for some types (e.g., Seq[_] and Array[_]). + * So, it provides that functionality here. + * This helper function will be removed in future releases. + */ + protected def lit2[T : TypeTag](v: T): Column = { + val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + Column(Literal(convert(v), dataType)) + } + + protected val DummyInputData = Seq((0, 0)).toDF("c0", "c1") + + protected val IntList2Data = + Seq( + (8 :: 5 :: Nil, 6 :: 4 :: Nil), + (3 :: 1 :: Nil, 3 :: 2 :: Nil), + (2 :: Nil, 3 :: Nil) + ).toDF("target", "predict") + + protected val Float2Data = + Seq( + (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f) + ).toDF("target", "predict") + + protected val TinyTrainData = + Seq( + (0.0, "1:0.8" :: "2:0.2" :: Nil), + (1.0, "2:0.7" :: Nil), + (0.0, "1:0.9" :: Nil) + ).toDF("label", "features") + + protected val TinyTestData = + Seq( + (0.0, "1:0.6" :: "2:0.1" :: Nil), + (1.0, "2:0.9" :: Nil), + (0.0, "1:0.2" :: Nil), + (0.0, "2:0.1" :: Nil), + (0.0, "0:0.6" :: "2:0.4" :: Nil) + ).toDF("label", "features") + + protected val LargeRegrTrainData = RegressionDatagen.exec( + hiveContext, + n_partitions = 2, + min_examples = 100000, + seed = 3, + prob_one = 0.8f + ).cache + + protected val LargeRegrTestData = RegressionDatagen.exec( + hiveContext, + n_partitions = 2, + min_examples = 100, + seed = 3, + prob_one = 0.5f + ).cache + + protected val LargeClassifierTrainData = RegressionDatagen.exec( + hiveContext, + n_partitions = 2, + min_examples = 100000, + seed = 5, + prob_one = 0.8f, + cl = true + ).cache + + protected val LargeClassifierTestData = RegressionDatagen.exec( + hiveContext, + n_partitions = 2, + min_examples = 100, + seed = 5, + prob_one = 0.5f, + cl = true + ).cache +} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala index 0285c4e..6d46292 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala @@ -18,15 +18,15 @@ */ package org.apache.spark.streaming -import reflect.ClassTag +import scala.reflect.ClassTag import org.apache.spark.ml.feature.HivemallLabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.sql.hive.HivemallOps._ +import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest import org.apache.spark.streaming.HivemallStreamingOps._ import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.scheduler.StreamInputInfo -import org.apache.spark.test.HivemallFeatureQueryTest /** * This is an input stream just for tests. http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala deleted file mode 100644 index 417d080..0000000 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.spark.test - -import scala.collection.mutable.Seq - -import hivemall.tools.RegressionDatagen - -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.QueryTest - -/** - * Base class for tests with Hivemall features. - */ -abstract class HivemallFeatureQueryTest extends QueryTest with TestHiveSingleton { - - import hiveContext.implicits._ - - protected val DummyInputData = - Seq( - (0, 0), (1, 1), (2, 2), (3, 3) - ).toDF("c0", "c1") - - protected val IntList2Data = - Seq( - (8 :: 5 :: Nil, 6 :: 4 :: Nil), - (3 :: 1 :: Nil, 3 :: 2 :: Nil), - (2 :: Nil, 3 :: Nil) - ).toDF("target", "predict") - - protected val Float2Data = - Seq( - (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f) - ).toDF("target", "predict") - - protected val TinyTrainData = - Seq( - (0.0, "1:0.8" :: "2:0.2" :: Nil), - (1.0, "2:0.7" :: Nil), - (0.0, "1:0.9" :: Nil) - ).toDF("label", "features") - - protected val TinyTestData = - Seq( - (0.0, "1:0.6" :: "2:0.1" :: Nil), - (1.0, "2:0.9" :: Nil), - (0.0, "1:0.2" :: Nil), - (0.0, "2:0.1" :: Nil), - (0.0, "0:0.6" :: "2:0.4" :: Nil) - ).toDF("label", "features") - - protected val LargeRegrTrainData = RegressionDatagen.exec( - hiveContext, - n_partitions = 2, - min_examples = 100000, - seed = 3, - prob_one = 0.8f - ).cache - - protected val LargeRegrTestData = RegressionDatagen.exec( - hiveContext, - n_partitions = 2, - min_examples = 100, - seed = 3, - prob_one = 0.5f - ).cache - - protected val LargeClassifierTrainData = RegressionDatagen.exec( - hiveContext, - n_partitions = 2, - min_examples = 100000, - seed = 5, - prob_one = 0.8f, - cl = true - ).cache - - protected val LargeClassifierTestData = RegressionDatagen.exec( - hiveContext, - n_partitions = 2, - min_examples = 100, - seed = 5, - prob_one = 0.5f, - cl = true - ).cache -} http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala b/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala index 6650a93..8a2a385 100644 --- a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala +++ b/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala @@ -50,13 +50,15 @@ object TestUtils extends Logging { } // TODO: Any same function in o.a.spark.*? -class TestDoubleWrapper(d: Double) { - // Check an equality between Double values +class TestFPWrapper(d: Double) { + + // Check an equality between Double/Float values def ~==(d: Double): Boolean = Math.abs(this.d - d) < 0.001 } -object TestDoubleWrapper { - @inline implicit def toTestDoubleWrapper(d: Double): TestDoubleWrapper = { - new TestDoubleWrapper(d) +object TestFPWrapper { + + @inline implicit def toTestFPWrapper(d: Double): TestFPWrapper = { + new TestFPWrapper(d) } }
