Repository: incubator-hivemall
Updated Branches:
  refs/heads/master b90999664 -> 79f92f4f2


Close #26: [HIVEMALL-35] Remove unnecessary implicit conversions in 
HivemallUtils


Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/79f92f4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/79f92f4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/79f92f4f

Branch: refs/heads/master
Commit: 79f92f4f2c6458ccf4133744a1654a46cbbb56cc
Parents: b909996
Author: Takeshi YAMAMURO <[email protected]>
Authored: Thu Jan 26 17:58:27 2017 +0900
Committer: Takeshi YAMAMURO <[email protected]>
Committed: Thu Jan 26 17:58:27 2017 +0900

----------------------------------------------------------------------
 .../knn/similarity/Distance2SimilarityUDF.java  |   2 +-
 .../hivemall/tools/RegressionDatagen.scala      |  12 +-
 .../org/apache/spark/sql/hive/HivemallOps.scala |  16 +-
 .../apache/spark/sql/hive/HivemallUtils.scala   | 109 +++----
 .../apache/spark/sql/hive/HiveUdfSuite.scala    |   2 +-
 .../spark/sql/hive/HivemallOpsSuite.scala       | 317 +++++++++----------
 .../spark/sql/hive/ModelMixingSuite.scala       |  29 +-
 .../apache/spark/sql/hive/XGBoostSuite.scala    |   7 +-
 .../sql/hive/benchmark/MiscBenchmark.scala      |  13 +-
 .../hive/test/HivemallFeatureQueryTest.scala    | 112 +++++++
 .../streaming/HivemallFeatureOpsSuite.scala     |   4 +-
 .../spark/test/HivemallFeatureQueryTest.scala   | 101 ------
 .../scala/org/apache/spark/test/TestUtils.scala |  12 +-
 13 files changed, 359 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java 
b/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java
index 43c42a7..9186b80 100644
--- a/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java
+++ b/core/src/main/java/hivemall/knn/similarity/Distance2SimilarityUDF.java
@@ -50,7 +50,7 @@ public final class Distance2SimilarityUDF extends GenericUDF {
 
     @Override
     public FloatWritable evaluate(DeferredObject[] arguments) throws 
HiveException {
-        float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0], 
distanceOI);
+        float d = PrimitiveObjectInspectorUtils.getFloat(arguments[0].get(), 
distanceOI);
         float sim = 1.f / (1.f + d);
         return new FloatWritable(sim);
     }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala 
b/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala
index 01664f4..72a5c83 100644
--- a/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala
+++ b/spark/spark-2.0/src/main/scala/hivemall/tools/RegressionDatagen.scala
@@ -19,8 +19,8 @@
 package hivemall.tools
 
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
 import org.apache.spark.sql.types._
 
 object RegressionDatagen {
@@ -57,10 +57,10 @@ object RegressionDatagen {
       )
     import sc.implicits._
     df.lr_datagen(
-      s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims 
-prob_one $prob_one"
-        + (if (dense) " -dense" else "")
-        + (if (sort) " -sort" else "")
-        + (if (cl) " -cl" else ""))
-      .select($"label".cast(DoubleType).as("label"), $"features")
+        lit(s"-n_examples $n_examples -n_features $n_features -n_dims $n_dims 
-prob_one $prob_one"
+          + (if (dense) " -dense" else "")
+          + (if (sort) " -sort" else "")
+          + (if (cl) " -cl" else ""))
+      ).select($"label".cast(DoubleType).as("label"), $"features")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala 
b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
index f233a2a..8fa4831 100644
--- a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
+++ b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.hive
 
 import java.util.UUID
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.feature.HivemallFeature
@@ -57,8 +55,6 @@ import org.apache.spark.unsafe.types.UTF8String
  * @groupname misc
  */
 final class HivemallOps(df: DataFrame) extends Logging {
-  import HivemallOps._
-  import HivemallUtils._
 
   /**
    * @see hivemall.regression.AdaDeltaUDTF
@@ -687,10 +683,14 @@ final class HivemallOps(df: DataFrame) extends Logging {
    * Amplifies and shuffle data inside partitions.
    * @group ftvec.amplify
    */
-  def part_amplify(xtimes: Int): DataFrame = {
+  def part_amplify(xtimes: Column): DataFrame = {
+    val xtimesInt = xtimes.expr match {
+      case Literal(v: Any, IntegerType) => v.asInstanceOf[Int]
+      case e => throw new AnalysisException("`xtimes` must be integer, however 
" + e)
+    }
     val rdd = df.rdd.mapPartitions({ iter =>
       val elems = iter.flatMap{ row =>
-        Seq.fill[Row](xtimes)(row)
+        Seq.fill[Row](xtimesInt)(row)
       }
       // Need to check how this shuffling affects results
       scala.util.Random.shuffle(elems)
@@ -792,7 +792,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
    */
   def each_top_k(k: Int, group: String, score: String, args: String*)
     : DataFrame = withTypedPlan {
-    val clusterDf = df.repartition(group).sortWithinPartitions(group)
+    val clusterDf = df.repartition(df(group)).sortWithinPartitions(group)
     val childrenAttributes = clusterDf.logicalPlan.output
     val generator = Generate(
       EachTopK(
@@ -881,7 +881,7 @@ final class HivemallOps(df: DataFrame) extends Logging {
 
   @inline private[this] def toHivemallFeatureDf(exprs: Column*): Seq[Column] = 
{
     df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map {
-      case (StructField(_, _: VectorUDT, _, _), c) => to_hivemall_features(c)
+      case (StructField(_, _: VectorUDT, _, _), c) => 
HivemallUtils.to_hivemall_features(c)
       case (_, c) => c
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala 
b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
index b7b7071..056d6d6 100644
--- 
a/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
+++ 
b/spark/spark-2.0/src/main/scala/org/apache/spark/sql/hive/HivemallUtils.scala
@@ -19,8 +19,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, 
Vectors}
-import org.apache.spark.sql.{Column, DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
@@ -31,19 +30,40 @@ object HivemallUtils {
   private[this] val maxDims = 100000000
 
   /**
-   * An implicit conversion to avoid doing annoying transformation.
-   * This class must be in o.a.spark.sql._ because
-   * a Column class is private.
+   * Check whether the given schema contains a column of the required data 
type.
+   * @param colName  column name
+   * @param dataType  required column data type
    */
-  @inline implicit def toBooleanLiteral(i: Boolean): Column = 
Column(Literal.create(i, BooleanType))
-  @inline implicit def toIntLiteral(i: Int): Column = Column(Literal.create(i, 
IntegerType))
-  @inline implicit def toFloatLiteral(i: Float): Column = 
Column(Literal.create(i, FloatType))
-  @inline implicit def toDoubleLiteral(i: Double): Column = 
Column(Literal.create(i, DoubleType))
-  @inline implicit def toStringLiteral(i: String): Column = 
Column(Literal.create(i, StringType))
-  @inline implicit def toIntArrayLiteral(i: Seq[Int]): Column =
-    Column(Literal.create(i, ArrayType(IntegerType)))
-  @inline implicit def toStringArrayLiteral(i: Seq[String]): Column =
-    Column(Literal.create(i, ArrayType(StringType)))
+  private[this] def checkColumnType(schema: StructType, colName: String, 
dataType: DataType)
+    : Unit = {
+    val actualDataType = schema(colName).dataType
+    require(actualDataType.equals(dataType),
+      s"Column $colName must be of type $dataType but was actually 
$actualDataType.")
+  }
+
+  def to_vector_func(dense: Boolean, dims: Int): Seq[String] => Vector = {
+    if (dense) {
+      // Dense features
+      i: Seq[String] => {
+        val features = new Array[Double](dims)
+        i.map { ft =>
+          val s = ft.split(":").ensuring(_.size == 2)
+          features(s(0).toInt) = s(1).toDouble
+        }
+        Vectors.dense(features)
+      }
+    } else {
+      // Sparse features
+      i: Seq[String] => {
+        val features = i.map { ft =>
+          // val s = ft.split(":").ensuring(_.size == 2)
+          val s = ft.split(":")
+          (s(0).toInt, s(1).toDouble)
+        }
+        Vectors.sparse(dims, features)
+      }
+    }
+  }
 
   def to_hivemall_features_func(): Vector => Array[String] = {
     case dv: DenseVector =>
@@ -83,20 +103,28 @@ object HivemallUtils {
   }
 
   /**
-   * Transforms `org.apache.spark.ml.linalg.Vector` into Hivemall features.
+   * Transforms Hivemall features into a [[Vector]].
+   */
+  def to_vector(dense: Boolean = false, dims: Int = maxDims): 
UserDefinedFunction = {
+    udf(to_vector_func(dense, dims))
+  }
+
+  /**
+   * Transforms a [[Vector]] into Hivemall features.
    */
   def to_hivemall_features: UserDefinedFunction = 
udf(to_hivemall_features_func)
 
   /**
-   * Returns a new vector with `1.0` (bias) appended to the input vector.
+   * Returns a new [[Vector]] with `1.0` (bias) appended to the input 
[[Vector]].
    * @group ftvec
    */
   def append_bias: UserDefinedFunction = udf(append_bias_func)
 
   /**
-   * Make up a function object from a Hivemall model.
+   * Builds a [[Vector]]-based model from a table of Hivemall models
    */
-  def funcModel(df: DataFrame, dense: Boolean = false, dims: Int = maxDims): 
UserDefinedFunction = {
+  def vectorized_model(df: DataFrame, dense: Boolean = false, dims: Int = 
maxDims)
+    : UserDefinedFunction = {
     checkColumnType(df.schema, "feature", StringType)
     checkColumnType(df.schema, "weight", DoubleType)
 
@@ -106,7 +134,7 @@ object HivemallUtils {
       .select($"weight")
       .map { case Row(weight: Double) => weight}
       .reduce(_ + _)
-    val weights = funcVectorizerImpl(dense, dims)(
+    val weights = to_vector_func(dense, dims)(
       df.select($"feature", $"weight")
         .where($"feature" !== "0")
         .map { case Row(label: String, feature: Double) => 
s"${label}:$feature"}
@@ -114,47 +142,4 @@ object HivemallUtils {
 
     udf((input: Vector) => BLAS.dot(input, weights) + intercept)
   }
-
-  /**
-   * Make up a function object to transform Hivemall features into Vector.
-   */
-  def funcVectorizer(dense: Boolean = false, dims: Int = maxDims): 
UserDefinedFunction = {
-    udf(funcVectorizerImpl(dense, dims))
-  }
-
-  private[this] def funcVectorizerImpl(dense: Boolean, dims: Int): Seq[String] 
=> Vector = {
-    if (dense) {
-      // Dense features
-      i: Seq[String] => {
-        val features = new Array[Double](dims)
-        i.map { ft =>
-          val s = ft.split(":").ensuring(_.size == 2)
-          features(s(0).toInt) = s(1).toDouble
-        }
-        Vectors.dense(features)
-      }
-    } else {
-      // Sparse features
-      i: Seq[String] => {
-        val features = i.map { ft =>
-          // val s = ft.split(":").ensuring(_.size == 2)
-          val s = ft.split(":")
-          (s(0).toInt, s(1).toDouble)
-        }
-        Vectors.sparse(dims, features)
-      }
-    }
-  }
-
-  /**
-   * Check whether the given schema contains a column of the required data 
type.
-   * @param colName  column name
-   * @param dataType  required column data type
-   */
-  private[this] def checkColumnType(schema: StructType, colName: String, 
dataType: DataType)
-    : Unit = {
-    val actualDataType = schema(colName).dataType
-    require(actualDataType.equals(dataType),
-      s"Column $colName must be of type $dataType but was actually 
$actualDataType.")
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
index d53ef73..74a138e 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HiveUdfSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.test.HivemallFeatureQueryTest
+import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
 import org.apache.spark.test.VectorQueryTest
 
 final class HiveUdfWithFeatureSuite extends HivemallFeatureQueryTest {

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
index 6f2f016..61af8d1 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
@@ -18,127 +18,112 @@
  */
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{AnalysisException, Column, Row}
-import org.apache.spark.sql.functions
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HivemallGroupedDataset._
 import org.apache.spark.sql.hive.HivemallOps._
 import org.apache.spark.sql.hive.HivemallUtils._
+import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
 import org.apache.spark.sql.types._
-import org.apache.spark.test.{HivemallFeatureQueryTest, TestUtils, 
VectorQueryTest}
-import org.apache.spark.test.TestDoubleWrapper._
+import org.apache.spark.test.{TestUtils, VectorQueryTest}
+import org.apache.spark.test.TestFPWrapper._
 
 final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
 
   test("knn.similarity") {
-    val row1 = DummyInputData.select(cosine_sim(Seq(1, 2, 3, 4), Seq(3, 4, 5, 
6))).collect
-    assert(row1(0).getFloat(0) ~== 0.500f)
+    val df1 = DummyInputData.select(cosine_sim(lit2(Seq(1, 2, 3, 4)), 
lit2(Seq(3, 4, 5, 6))))
+    assert(df1.collect.apply(0).getFloat(0) ~== 0.500f)
 
-    val row2 = DummyInputData.select(jaccard(5, 6)).collect
-    assert(row2(0).getFloat(0) ~== 0.96875f)
+    val df2 = DummyInputData.select(jaccard(lit(5), lit(6)))
+    assert(df2.collect.apply(0).getFloat(0) ~== 0.96875f)
 
-    val row3 = DummyInputData.select(angular_similarity(Seq(1, 2, 3), Seq(4, 
5, 6))).collect
-    assert(row3(0).getFloat(0) ~== 0.500f)
+    val df3 = DummyInputData.select(angular_similarity(lit2(Seq(1, 2, 3)), 
lit2(Seq(4, 5, 6))))
+    assert(df3.collect.apply(0).getFloat(0) ~== 0.500f)
 
-    val row4 = DummyInputData.select(euclid_similarity(Seq(5, 3, 1), Seq(2, 8, 
3))).collect
-    assert(row4(0).getFloat(0) ~== 0.33333334f)
+    val df4 = DummyInputData.select(euclid_similarity(lit2(Seq(5, 3, 1)), 
lit2(Seq(2, 8, 3))))
+    assert(df4.collect.apply(0).getFloat(0) ~== 0.33333334f)
 
-    // TODO: $"c0" throws AnalysisException, why?
-    // val row5 = 
DummyInputData.select(distance2similarity(DummyInputData("c0"))).collect
-    // assert(row5(0).getFloat(0) ~== 0.1f)
+    val df5 = DummyInputData.select(distance2similarity(lit(1.0)))
+    assert(df5.collect.apply(0).getFloat(0) ~== 0.5f)
   }
 
   test("knn.distance") {
-    val row1 = DummyInputData.select(hamming_distance(1, 3)).collect
-    assert(row1(0).getInt(0) == 1)
+    val df1 = DummyInputData.select(hamming_distance(lit(1), lit(3)))
+    checkAnswer(df1, Row(1) :: Nil)
 
-    val row2 = DummyInputData.select(popcnt(1)).collect
-    assert(row2(0).getInt(0) == 1)
+    val df2 = DummyInputData.select(popcnt(lit(1)))
+    checkAnswer(df2, Row(1) :: Nil)
 
-    val row3 = DummyInputData.select(kld(0.1, 0.5, 0.2, 0.5)).collect
-    assert(row3(0).getDouble(0) ~== 0.01)
+    val df3 = DummyInputData.select(kld(lit(0.1), lit(0.5), lit(0.2), 
lit(0.5)))
+    assert(df3.collect.apply(0).getDouble(0) ~== 0.01)
 
-    val row4 = DummyInputData.select(
-      euclid_distance(Seq("0.1", "0.5"), Seq("0.2", "0.5"))).collect
-    assert(row4(0).getFloat(0) ~== 1.4142135f)
+    val df4 = DummyInputData.select(
+      euclid_distance(lit2(Seq("0.1", "0.5")), lit2(Seq("0.2", "0.5"))))
+    assert(df4.collect.apply(0).getFloat(0) ~== 1.4142135f)
 
-    val row5 = DummyInputData.select(
-      cosine_distance(Seq("0.8", "0.3"), Seq("0.4", "0.6"))).collect
-    assert(row5(0).getFloat(0) ~== 1.0f)
+    val df5 = DummyInputData.select(
+      cosine_distance(lit2(Seq("0.8", "0.3")), lit2(Seq("0.4", "0.6"))))
+    assert(df5.collect.apply(0).getFloat(0) ~== 1.0f)
 
-    val row6 = DummyInputData.select(
-      angular_distance(Seq("0.1", "0.1"), Seq("0.3", "0.8"))).collect
-    assert(row6(0).getFloat(0) ~== 0.50f)
+    val df6 = DummyInputData.select(
+      angular_distance(lit2(Seq("0.1", "0.1")), lit2(Seq("0.3", "0.8"))))
+    assert(df6.collect.apply(0).getFloat(0) ~== 0.50f)
 
-    val row7 = DummyInputData.select(
-      manhattan_distance(Seq("0.7", "0.8"), Seq("0.5", "0.6"))).collect
-    assert(row7(0).getFloat(0) ~== 4.0f)
+    val df7 = DummyInputData.select(
+      manhattan_distance(lit2(Seq("0.7", "0.8")), lit2(Seq("0.5", "0.6"))))
+    assert(df7.collect.apply(0).getFloat(0) ~== 4.0f)
 
-    val row8 = DummyInputData.select(
-      minkowski_distance(Seq("0.1", "0.2"), Seq("0.2", "0.2"), 1.0)).collect
-    assert(row8(0).getFloat(0) ~== 2.0f)
+    val df8 = DummyInputData.select(
+      minkowski_distance(lit2(Seq("0.1", "0.2")), lit2(Seq("0.2", "0.2")), 
lit2(1.0)))
+    assert(df8.collect.apply(0).getFloat(0) ~== 2.0f)
   }
 
   test("knn.lsh") {
     import hiveContext.implicits._
-    assert(IntList2Data.minhash(1, $"target").count() > 0)
+    assert(IntList2Data.minhash(lit(1), $"target").count() > 0)
 
-    assert(DummyInputData.select(bbit_minhash(Seq("1:0.1", "2:0.5"), 
false)).count
+    assert(DummyInputData.select(bbit_minhash(lit2(Seq("1:0.1", "2:0.5")), 
lit(false))).count
       == DummyInputData.count)
-    assert(DummyInputData.select(minhashes(Seq("1:0.1", "2:0.5"), false)).count
+    assert(DummyInputData.select(minhashes(lit2(Seq("1:0.1", "2:0.5")), 
lit(false))).count
       == DummyInputData.count)
   }
 
   test("ftvec - add_bias") {
-    // TODO: This import does not work and why?
-    // import hiveContext.implicits._
-    
assert(TinyTrainData.select(add_bias(TinyTrainData.col("features"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.8", "2:0.2", "0:1.0")),
-        Row(Seq("2:0.7", "0:1.0")),
-        Row(Seq("1:0.9", "0:1.0"))))
+    import hiveContext.implicits._
+    checkAnswer(TinyTrainData.select(add_bias($"features")),
+        Row(Seq("1:0.8", "2:0.2", "0:1.0")) ::
+        Row(Seq("2:0.7", "0:1.0")) ::
+        Row(Seq("1:0.9", "0:1.0")) ::
+        Nil
+      )
   }
 
   test("ftvec - extract_feature") {
-    val row = DummyInputData.select(extract_feature("1:0.8")).collect
-    assert(row(0).getString(0) == "1")
+    val df = DummyInputData.select(extract_feature(lit("1:0.8")))
+    checkAnswer(df, Row("1") :: Nil)
   }
 
   test("ftvec - extract_weight") {
-    val row = DummyInputData.select(extract_weight("3:0.1")).collect
-    assert(row(0).getDouble(0) ~== 0.1)
+    val df = DummyInputData.select(extract_weight(lit("3:0.1")))
+    assert(df.collect.apply(0).getDouble(0) ~== 0.1)
   }
 
-  // test("ftvec - explode_array") {
-  //   import hiveContext.implicits._
-  //   assert(TinyTrainData.explode_array("features")
-  //       .select($"feature").collect.toSet
-  //     === Set(Row("1:0.8"), Row("2:0.2"), Row("2:0.7"), Row("1:0.9")))
-  // }
+  test("ftvec - explode_array") {
+    import hiveContext.implicits._
+    val df = TinyTrainData.explode_array($"features").select($"feature")
+    checkAnswer(df, Row("1:0.8") :: Row("2:0.2") :: Row("2:0.7") :: 
Row("1:0.9") :: Nil)
+  }
 
   test("ftvec - add_feature_index") {
-    // import hiveContext.implicits._
-    val doubleListData = {
-      // TODO: Use `toDF`
-      val rowRdd = hiveContext.sparkContext.parallelize(
-          Row(0.8 :: 0.5 :: Nil) ::
-          Row(0.3 :: 0.1 :: Nil) ::
-          Row(0.2 :: Nil) ::
-          Nil
-        )
-      hiveContext.createDataFrame(
-        rowRdd,
-        StructType(
-          StructField("data", ArrayType(DoubleType), true) ::
-          Nil)
-        )
-    }
-
-    assert(doubleListData.select(
-        add_feature_index(doubleListData.col("data"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.8", "2:0.5")),
-        Row(Seq("1:0.3", "2:0.1")),
-        Row(Seq("1:0.2"))))
+    import hiveContext.implicits._
+    val doubleListData = Seq(Array(0.8, 0.5), Array(0.3, 0.1), 
Array(0.2)).toDF("data")
+    checkAnswer(
+        doubleListData.select(add_feature_index($"data")),
+        Row(Seq("1:0.8", "2:0.5")) ::
+        Row(Seq("1:0.3", "2:0.1")) ::
+        Row(Seq("1:0.2")) ::
+        Nil
+      )
   }
 
   test("ftvec - sort_by_feature") {
@@ -158,40 +143,46 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
           Nil)
         )
     }
-
     val sortedKeys = 
intFloatMapData.select(sort_by_feature(intFloatMapData.col("data")))
       .collect.map {
         case Row(m: Map[Int, Float]) => m.keysIterator.toSeq
     }
-
     assert(sortedKeys.toSet === Set(Seq(1, 2, 3), Seq(1, 2), Seq(1, 2, 3, 4)))
   }
 
   test("ftvec.hash") {
-    assert(DummyInputData.select(mhash("test")).count == DummyInputData.count)
-    assert(DummyInputData.select(sha1("test")).count == DummyInputData.count)
-    // assert(DummyInputData.select(array_hash_values(Seq("aaa", "bbb"))).count
+    assert(DummyInputData.select(mhash(lit("test"))).count == 
DummyInputData.count)
+    
assert(DummyInputData.select(org.apache.spark.sql.hive.HivemallOps.sha1(lit("test"))).count
 ==
+      DummyInputData.count)
+    // TODO: The tests below failed because:
+    //   org.apache.spark.sql.AnalysisException: List type in java is 
unsupported because JVM type
+    //     erasure makes spark fail to catch a component type in List<>;
+    //
+    // assert(DummyInputData.select(array_hash_values(lit2(Seq("aaa", 
"bbb")))).count
     //   == DummyInputData.count)
-    // assert(DummyInputData.select(prefixed_hash_values(Seq("ccc", "ddd"), 
"prefix")).count
+    // assert(DummyInputData.select(
+    //     prefixed_hash_values(lit2(Seq("ccc", "ddd")), lit("prefix"))).count
     //   == DummyInputData.count)
   }
 
   test("ftvec.scaling") {
-    assert(TinyTrainData.select(rescale(2.0f, 1.0, 5.0)).collect.toSet
-      === Set(Row(0.25f)))
-    assert(TinyTrainData.select(zscore(1.0f, 0.5, 0.5)).collect.toSet
-      === Set(Row(1.0f)))
-    
assert(TinyTrainData.select(normalize(TinyTrainData.col("features"))).collect.toSet
-      === Set(
-        Row(Seq("1:0.9701425", "2:0.24253562")),
-        Row(Seq("2:1.0")),
-        Row(Seq("1:1.0"))))
+    val df1 = TinyTrainData.select(rescale(lit(2.0f), lit(1.0), lit(5.0)))
+    assert(df1.collect.apply(0).getFloat(0) === 0.25f)
+    val df2 = TinyTrainData.select(zscore(lit(1.0f), lit(0.5), lit(0.5)))
+    assert(df2.collect.apply(0).getFloat(0) === 1.0f)
+    val df3 = TinyTrainData.select(normalize(TinyTrainData.col("features")))
+    checkAnswer(
+      df3,
+      Row(Seq("1:0.9701425", "2:0.24253562")) ::
+      Row(Seq("2:1.0")) ::
+      Row(Seq("1:1.0")) ::
+      Nil)
   }
 
   test("ftvec.selection - chi2") {
     import hiveContext.implicits._
 
-    // see also hivemall.ftvec.selection.ChiSquareUDFTest
+    // See also hivemall.ftvec.selection.ChiSquareUDFTest
     val df = Seq(
       Seq(
         Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
@@ -220,88 +211,85 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
   test("ftvec.conv - quantify") {
     import hiveContext.implicits._
     val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", 
false)).toDF
-    // This test is done in a single parition because `HivemallOps#quantify` 
assigns indentifiers
+    // This test is done in a single partition because `HivemallOps#quantify` 
assigns identifiers
     // for non-numerical values in each partition.
-    assert(testDf.coalesce(1).quantify(Seq[Column](true) ++ testDf.cols: 
_*).collect.toSet
-      === Set(Row(1, 0, 0), Row(2, 1, 1), Row(3, 0, 1)))
+    checkAnswer(
+      testDf.coalesce(1).quantify(lit(true) +: testDf.cols: _*),
+      Row(1, 0, 0) :: Row(2, 1, 1) :: Row(3, 0, 1) :: Nil)
   }
 
   test("ftvec.amplify") {
     import hiveContext.implicits._
-    assert(TinyTrainData.amplify(3, $"label", $"features").count() == 9)
-    // assert(TinyTrainData.rand_amplify(3, "-buf 128", $"label", 
$"features").count() == 9)
-    assert(TinyTrainData.part_amplify(3).count() == 9)
+    assert(TinyTrainData.amplify(lit(3), $"label", $"features").count() == 9)
+    assert(TinyTrainData.part_amplify(lit(3)).count() == 9)
+    // TODO: The test below failed because:
+    //   java.lang.RuntimeException: Unsupported literal type class 
scala.Tuple3
+    //     (-buf 128,label,features)
+    //
+    // assert(TinyTrainData.rand_amplify(lit(3), lit("-buf 8", $"label", 
$"features")).count() == 9)
   }
 
   ignore("ftvec.conv") {
     import hiveContext.implicits._
 
     val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: 
Nil)).toDF("a", "b")
-    assert(df1.select(to_dense_features(df1("b"), 3)).collect.toSet
-      === Set(Row(Array(0.1f, 0.0f, 0.3f)), Array(0.0f, 0.2f, 0.0f)))
-
+    checkAnswer(
+      df1.select(to_dense_features(df1("b"), lit(3))),
+      Row(Array(0.1f, 0.0f, 0.3f)) :: Row(Array(0.0f, 0.2f, 0.0f)) :: Nil
+    )
     val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    assert(df2.select(to_sparse_features(df2("a"), df2("b"), 
df2("c"))).collect.toSet
-      === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", 
"3:0.4"))))
+    checkAnswer(
+      df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))),
+      Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", 
"3:0.4")) :: Nil
+    )
   }
 
   test("ftvec.trans") {
     import hiveContext.implicits._
 
     val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c")
-    assert(df1.binarize_label($"a", $"b", $"c").collect.toSet === Set(Row(1, 
1)))
+    checkAnswer(
+      df1.binarize_label($"a", $"b", $"c"),
+      Row(1, 1) :: Row(1, 1) :: Row(1, 1) :: Nil
+    )
 
     val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b")
-    assert(df2.select(vectorize_features(Seq("a", "b"), df2("a"), 
df2("b"))).collect.toSet
-      === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
+    checkAnswer(
+      df2.select(vectorize_features(lit2(Seq("a", "b")), df2("a"), df2("b"))),
+      Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil
+    )
 
     val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b")
-    assert(df3.select(categorical_features(Seq("a", "b"), df3("a"), 
df3("b"))).collect.toSet
-      === Set(Row(Seq("a#c11", "b#c12")), Row(Seq("a#c21", "b#c22"))))
+    checkAnswer(
+      df3.select(categorical_features(lit2(Seq("a", "b")), df3("a"), 
df3("b"))),
+      Row(Seq("a#c11", "b#c12")) :: Row(Seq("a#c21", "b#c22")) :: Nil
+    )
 
     val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    assert(df4.select(indexed_features(df4("a"), df4("b"), 
df4("c"))).collect.toSet
-      === Set(Row(Seq("1:0.1", "2:0.2", "3:0.3")), Row(Seq("1:0.2", "2:0.5", 
"3:0.4"))))
+    checkAnswer(
+      df4.select(indexed_features(df4("a"), df4("b"), df4("c"))),
+      Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", 
"3:0.4")) :: Nil
+    )
 
-    val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c")
-    assert(df5.coalesce(1).quantified_features(true, df5("a"), df5("b"), 
df5("c")).collect.toSet
-      === Set(Row(Seq(0.0, 0.0, 0.0)), Row(Seq(1.0, 0.0, 1.0))))
+    val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", 
"c").coalesce(1)
+    checkAnswer(
+      df5.quantified_features(lit(true), df5("a"), df5("b"), df5("c")),
+      Row(Seq(0.0, 0.0, 0.0)) :: Row(Seq(1.0, 0.0, 1.0)) :: Nil
+    )
 
     val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b")
-    assert(df6.select(quantitative_features(Seq("a", "b"), df6("a"), 
df6("b"))).collect.toSet
-      === Set(Row(Seq("a:0.1", "b:0.2")), Row(Seq("a:0.5", "b:0.3"))))
+    checkAnswer(
+      df6.select(quantitative_features(lit2(Seq("a", "b")), df6("a"), 
df6("b"))),
+      Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil
+    )
   }
 
   test("misc - hivemall_version") {
-    assert(DummyInputData.select(hivemall_version()).collect.toSet === 
Set(Row("0.4.2-rc.2")))
-    /**
-     * TODO: Why a test below does fail?
-     *
-     * checkAnswer(
-     *   DummyInputData.select(hivemall_version()).distinct,
-     *   Row("0.3.1")
-     * )
-     *
-     * The test throw an exception below:
-     *
-     * - hivemall_version *** FAILED ***
-     *  org.apache.spark.sql.AnalysisException:
-     *    Cannot resolve column name 
"HiveSimpleUDF#hivemall.HivemallVersionUDF()" among
-     *    (HiveSimpleUDF#hivemall.Hivemall VersionUDF());
-     *   at 
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
-     *   at 
org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
-     *   at scala.Option.getOrElse(Option.scala:120)
-     *   at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
-     *   at 
org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
-     *   at 
org.apache.spark.sql.DataFrame$$anonfun$30.apply(DataFrame.scala:1227)
-     *   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
-     *   ...
-     */
+    checkAnswer(DummyInputData.select(hivemall_version()), Row("0.4.2-rc.2"))
   }
 
   test("misc - rowid") {
-    val df = DummyInputData.select(rowid())
-    assert(df.distinct.count == df.count)
+    assert(DummyInputData.select(rowid()).distinct.count == 
DummyInputData.count)
   }
 
   test("misc - each_top_k") {
@@ -324,7 +312,7 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
       Nil
     )
     checkAnswer(
-      testDf.each_top_k(1, $"key", $"score", $"key", $"value"),
+      testDf.each_top_k(lit(1), $"key", $"score", $"key", $"value"),
       Row(1, "a", "3") ::
       Row(1, "b", "4") ::
       Row(1, "c", "6") ::
@@ -340,7 +328,7 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
       Nil
     )
     checkAnswer(
-      testDf.each_top_k(-1, $"key", $"score", $"key", $"value"),
+      testDf.each_top_k(lit(-1), $"key", $"score", $"key", $"value"),
       Row(1, "a", "1") ::
       Row(1, "b", "5") ::
       Row(1, "c", "6") ::
@@ -348,7 +336,7 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
     )
 
     // Check if some exceptions thrown in case of some conditions
-    assert(intercept[AnalysisException] { testDf.each_top_k(0.1, $"key", 
$"score") }
+    assert(intercept[AnalysisException] { testDf.each_top_k(lit(0.1), $"key", 
$"score") }
       .getMessage contains "`k` must be integer, however")
     assert(intercept[AnalysisException] { testDf.each_top_k(1, "key", "data") }
       .getMessage contains "must have a comparable type")
@@ -369,7 +357,7 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
 
     val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil))
       .toDF("label", "features")
-      .train_randomforest_regr($"features", $"label", "-trees 2")
+      .train_randomforest_regr($"features", $"label")
 
     val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: 
Nil))
       .toDF("label", "features")
@@ -379,9 +367,8 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
       .join(testData).coalesce(1)
       .select(
         $"rowid",
-        tree_predict(
-          model("model_id"), model("model_type"), model("pred_model"), 
testData("features"), true)
-            .as("predicted")
+        tree_predict(model("model_id"), model("model_type"), 
model("pred_model"),
+          testData("features"), lit(true)).as("predicted")
       )
       .groupBy($"rowid")
       .rf_ensemble("predicted").as("rowid", "predicted")
@@ -392,38 +379,24 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
 
   test("tools.array - select_k_best") {
     import hiveContext.implicits._
-    import org.apache.spark.sql.functions._
 
     val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9))
     val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", 
"importance_list")
     val k = 2
 
-    checkAnswer(df.select(select_k_best(df("features"), df("importance_list"), 
lit(k))),
-      data.map(s => Row(Seq(s(0).toDouble, s(2).toDouble))))
+    checkAnswer(
+      df.select(select_k_best(df("features"), df("importance_list"), lit(k))),
+      Row(Seq(0.0, 3.0)) :: Row(Seq(2.0, 1.0)) :: Row(Seq(5.0, 9.0)) :: Nil
+    )
   }
 
   test("misc - sigmoid") {
     import hiveContext.implicits._
-    /**
-     * TODO: SigmodUDF only accepts floating-point types in spark-v1.5.0?
-     * This test throws an exception below:
-     *
-     * org.apache.spark.sql.catalyst.analysis.UnresolvedException:
-     *    Invalid call to dataType on unresolved object, tree: 'data
-     *  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType
-     *    (unresolved.scala:59)
-     *  at 
org.apache.spark.sql.hive.HiveSimpleUDF$$anonfun$method$1.apply(hiveUDFs.scala:119)
-     * ...
-     */
-    val rows = DummyInputData.select(sigmoid($"c0")).collect
-    assert(rows(0).getDouble(0) ~== 0.500)
-    assert(rows(1).getDouble(0) ~== 0.731)
-    assert(rows(2).getDouble(0) ~== 0.880)
-    assert(rows(3).getDouble(0) ~== 0.952)
+    assert(DummyInputData.select(sigmoid($"c0")).collect.apply(0).getDouble(0) 
~== 0.500)
   }
 
   test("misc - lr_datagen") {
-    assert(TinyTrainData.lr_datagen("-n_examples 100 -n_features 10 -seed 
100").count >= 100)
+    assert(TinyTrainData.lr_datagen(lit("-n_examples 100 -n_features 10 -seed 
100")).count >= 100)
   }
 
   test("invoke regression functions") {
@@ -577,7 +550,7 @@ final class HivemallOpsWithFeatureSuite extends 
HivemallFeatureQueryTest {
        * WARN Column: Constructing trivially true equals predicate, 
'rowid#1323 = rowid#1323'.
        * Perhaps you need to use aliases.
        */
-      .select($"rowid", functions.when(sigmoid($"sum(value)") > 0.50, 
1.0).otherwise(0.0))
+      .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 
1.0).otherwise(0.0))
       .as("rowid", "predicted")
 
     // Evaluation

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
index 5a58c2d..3b87a96 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
@@ -33,10 +33,9 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.feature.HivemallLabeledPoint
 import org.apache.spark.sql.{Column, DataFrame, Row}
-import org.apache.spark.sql.functions.when
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HivemallGroupedDataset._
 import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.test.TestUtils
@@ -156,9 +155,16 @@ final class ModelMixingSuite extends SparkFunSuite with 
BeforeAndAfter {
       // Build a model
       val model = {
         val groupId = 
s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = TestUtils.invokeFunc(new 
HivemallOps(trainA9aData.part_amplify(1)), func,
-          Seq[Column](add_bias($"features"), $"label",
-            s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 -mix_cancel"))
+        val res = TestUtils.invokeFunc(
+          new HivemallOps(trainA9aData.part_amplify(lit(1))),
+          func,
+          Seq[Column](
+            add_bias($"features"),
+            $"label",
+            lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 " +
+              "-mix_cancel")
+          )
+        )
         if (!res.columns.contains("conv")) {
           res.groupBy("feature").agg("weight" -> "avg")
         } else {
@@ -217,9 +223,16 @@ final class ModelMixingSuite extends SparkFunSuite with 
BeforeAndAfter {
       // Build a model
       val model = {
         val groupId = 
s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = TestUtils.invokeFunc(new 
HivemallOps(trainKdd2010aData.part_amplify(1)), func,
-          Seq[Column](add_bias($"features"), $"label",
-            s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 -mix_cancel"))
+        val res = TestUtils.invokeFunc(
+          new HivemallOps(trainKdd2010aData.part_amplify(lit(1))),
+          func,
+          Seq[Column](
+            add_bias($"features"),
+            $"label",
+            lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 " +
+              "-mix_cancel")
+          )
+        )
         if (!res.columns.contains("conv")) {
           res.groupBy("feature").agg("weight" -> "avg")
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
index 136d6bc..7c78678 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.HivemallGroupedDataset._
 import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
 import org.apache.spark.sql.types._
 import org.apache.spark.test.VectorQueryTest
 
@@ -56,7 +55,7 @@ final class XGBoostSuite extends VectorQueryTest {
 
       // Save built models in persistent storage
       mllibTrainDf.repartition(numModles)
-        .train_xgboost_regr($"features", $"label", s"${defaultOptions}")
+        .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}"))
         .write.format(xgboost).save(tempDir)
 
       // Check #models generated by XGBoost
@@ -82,7 +81,7 @@ final class XGBoostSuite extends VectorQueryTest {
     withTempModelDir { tempDir =>
 
       mllibTrainDf.repartition(numModles)
-        .train_xgboost_regr($"features", $"label", s"${defaultOptions}")
+        .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}"))
         .write.format(xgboost).save(tempDir)
 
       // Check #models generated by XGBoost
@@ -110,7 +109,7 @@ final class XGBoostSuite extends VectorQueryTest {
 
       mllibTrainDf.repartition(numModles)
         .train_xgboost_multiclass_classifier(
-          $"features", $"label", s"${defaultOptions.set("num_class", "2")}")
+          $"features", $"label", lit(s"${defaultOptions.set("num_class", 
"2")}"))
         .write.format(xgboost).save(tempDir)
 
       // Check #models generated by XGBoost

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
index a16ebb8..87782b9 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
@@ -25,9 +25,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, 
Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, 
Project}
 import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveGenericUDF, HiveGenericUDTF}
-import org.apache.spark.sql.hive.HivemallUtils._
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.types._
 import org.apache.spark.test.TestUtils
@@ -49,7 +48,7 @@ class TestFuncWrapper(df: DataFrame) {
 
   def each_top_k_improved(k: Int, group: String, score: String, args: String*)
     : DataFrame = withTypedPlan {
-    val clusterDf = df.repartition(group).sortWithinPartitions(group)
+    val clusterDf = df.repartition(df(group)).sortWithinPartitions(group)
     val childrenAttributes = clusterDf.logicalPlan.output
     val generator = Generate(
       EachTopK(
@@ -139,7 +138,7 @@ class MiscBenchmark extends SparkFunSuite {
 
     def sigmoidExprs(expr: Column): Column = {
       val one: () => Literal = () => Literal.create(1.0, DoubleType)
-      Column(one()) / (Column(one()) + functions.exp(-expr))
+      Column(one()) / (Column(one()) + exp(-expr))
     }
     addBenchmarkCase(
       "exprs",
@@ -153,7 +152,7 @@ class MiscBenchmark extends SparkFunSuite {
       }(RowEncoder(schema))
     )
 
-    val sigmoidUdf = functions.udf((d: Double) => 1.0 / (1.0 + Math.exp(-d)))
+    val sigmoidUdf = udf { (d: Double) => 1.0 / (1.0 + Math.exp(-d)) }
     addBenchmarkCase(
       "spark-udf",
       testDf.select(sigmoidUdf($"value"))
@@ -201,7 +200,7 @@ class MiscBenchmark extends SparkFunSuite {
     addBenchmarkCase(
       "rank",
       testDf.withColumn(
-        "rank", 
functions.rank().over(Window.partitionBy($"key").orderBy($"score".desc))
+        "rank", rank().over(Window.partitionBy($"key").orderBy($"score".desc))
       ).where($"rank" <= topK)
     )
 
@@ -212,7 +211,7 @@ class MiscBenchmark extends SparkFunSuite {
       // org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to name
       //   on unresolved object, tree: unresolvedalias('value, None)
       // at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.name(unresolved.scala:339)
-      testDf.each_top_k(topK, $"key", $"score", testDf("value"))
+      testDf.each_top_k(lit(topK), $"key", $"score", testDf("value"))
     )
 
     addBenchmarkCase(

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
new file mode 100644
index 0000000..a4733f5
--- /dev/null
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.hive.test
+
+import scala.collection.mutable.Seq
+import scala.reflect.runtime.universe.TypeTag
+
+import hivemall.tools.RegressionDatagen
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.QueryTest
+
+/**
+ * Base class for tests with Hivemall features.
+ */
+abstract class HivemallFeatureQueryTest extends QueryTest with 
TestHiveSingleton {
+
+  import hiveContext.implicits._
+
+  /**
+   * TODO: spark-2.0 does not support literals for some types (e.g., Seq[_] 
and Array[_]).
+   * So, it provides that functionality here.
+   * This helper function will be removed in future releases.
+   */
+  protected def lit2[T : TypeTag](v: T): Column = {
+    val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T]
+    val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+    Column(Literal(convert(v), dataType))
+  }
+
+  protected val DummyInputData = Seq((0, 0)).toDF("c0", "c1")
+
+  protected val IntList2Data =
+    Seq(
+      (8 :: 5 :: Nil, 6 :: 4 :: Nil),
+      (3 :: 1 :: Nil, 3 :: 2 :: Nil),
+      (2 :: Nil, 3 :: Nil)
+    ).toDF("target", "predict")
+
+  protected val Float2Data =
+    Seq(
+      (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f)
+    ).toDF("target", "predict")
+
+  protected val TinyTrainData =
+    Seq(
+      (0.0, "1:0.8" :: "2:0.2" :: Nil),
+      (1.0, "2:0.7" :: Nil),
+      (0.0, "1:0.9" :: Nil)
+    ).toDF("label", "features")
+
+  protected val TinyTestData =
+    Seq(
+      (0.0, "1:0.6" :: "2:0.1" :: Nil),
+      (1.0, "2:0.9" :: Nil),
+      (0.0, "1:0.2" :: Nil),
+      (0.0, "2:0.1" :: Nil),
+      (0.0, "0:0.6" :: "2:0.4" :: Nil)
+    ).toDF("label", "features")
+
+  protected val LargeRegrTrainData = RegressionDatagen.exec(
+      hiveContext,
+      n_partitions = 2,
+      min_examples = 100000,
+      seed = 3,
+      prob_one = 0.8f
+    ).cache
+
+  protected val LargeRegrTestData = RegressionDatagen.exec(
+      hiveContext,
+      n_partitions = 2,
+      min_examples = 100,
+      seed = 3,
+      prob_one = 0.5f
+    ).cache
+
+  protected val LargeClassifierTrainData = RegressionDatagen.exec(
+      hiveContext,
+      n_partitions = 2,
+      min_examples = 100000,
+      seed = 5,
+      prob_one = 0.8f,
+      cl = true
+    ).cache
+
+  protected val LargeClassifierTestData = RegressionDatagen.exec(
+      hiveContext,
+      n_partitions = 2,
+      min_examples = 100,
+      seed = 5,
+      prob_one = 0.5f,
+      cl = true
+    ).cache
+}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala
index 0285c4e..6d46292 100644
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala
+++ 
b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallFeatureOpsSuite.scala
@@ -18,15 +18,15 @@
  */
 package org.apache.spark.streaming
 
-import reflect.ClassTag
+import scala.reflect.ClassTag
 
 import org.apache.spark.ml.feature.HivemallLabeledPoint
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.hive.HivemallOps._
+import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
 import org.apache.spark.streaming.HivemallStreamingOps._
 import org.apache.spark.streaming.dstream.InputDStream
 import org.apache.spark.streaming.scheduler.StreamInputInfo
-import org.apache.spark.test.HivemallFeatureQueryTest
 
 /**
  * This is an input stream just for tests.

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala
deleted file mode 100644
index 417d080..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/test/HivemallFeatureQueryTest.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.collection.mutable.Seq
-
-import hivemall.tools.RegressionDatagen
-
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.QueryTest
-
-/**
- * Base class for tests with Hivemall features.
- */
-abstract class HivemallFeatureQueryTest extends QueryTest with 
TestHiveSingleton {
-
-  import hiveContext.implicits._
-
-  protected val DummyInputData =
-    Seq(
-      (0, 0), (1, 1), (2, 2), (3, 3)
-    ).toDF("c0", "c1")
-
-  protected val IntList2Data =
-    Seq(
-      (8 :: 5 :: Nil, 6 :: 4 :: Nil),
-      (3 :: 1 :: Nil, 3 :: 2 :: Nil),
-      (2 :: Nil, 3 :: Nil)
-    ).toDF("target", "predict")
-
-  protected val Float2Data =
-    Seq(
-      (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f)
-    ).toDF("target", "predict")
-
-  protected val TinyTrainData =
-    Seq(
-      (0.0, "1:0.8" :: "2:0.2" :: Nil),
-      (1.0, "2:0.7" :: Nil),
-      (0.0, "1:0.9" :: Nil)
-    ).toDF("label", "features")
-
-  protected val TinyTestData =
-    Seq(
-      (0.0, "1:0.6" :: "2:0.1" :: Nil),
-      (1.0, "2:0.9" :: Nil),
-      (0.0, "1:0.2" :: Nil),
-      (0.0, "2:0.1" :: Nil),
-      (0.0, "0:0.6" :: "2:0.4" :: Nil)
-    ).toDF("label", "features")
-
-  protected val LargeRegrTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 3,
-      prob_one = 0.8f
-    ).cache
-
-  protected val LargeRegrTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 3,
-      prob_one = 0.5f
-    ).cache
-
-  protected val LargeClassifierTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 5,
-      prob_one = 0.8f,
-      cl = true
-    ).cache
-
-  protected val LargeClassifierTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 5,
-      prob_one = 0.5f,
-      cl = true
-    ).cache
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/79f92f4f/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
index 6650a93..8a2a385 100644
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
+++ b/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
@@ -50,13 +50,15 @@ object TestUtils extends Logging {
 }
 
 // TODO: Any same function in o.a.spark.*?
-class TestDoubleWrapper(d: Double) {
-  // Check an equality between Double values
+class TestFPWrapper(d: Double) {
+
+  // Check an equality between Double/Float values
   def ~==(d: Double): Boolean = Math.abs(this.d - d) < 0.001
 }
 
-object TestDoubleWrapper {
-  @inline implicit def toTestDoubleWrapper(d: Double): TestDoubleWrapper = {
-    new TestDoubleWrapper(d)
+object TestFPWrapper {
+
+  @inline implicit def toTestFPWrapper(d: Double): TestFPWrapper = {
+    new TestFPWrapper(d)
   }
 }

Reply via email to