Repository: incubator-hivemall
Updated Branches:
  refs/heads/master 629222d16 -> cd24be895


http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
deleted file mode 100644
index 399a557..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HivemallGroupedDataset._
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.HivemallUtils._
-import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
-import org.apache.spark.sql.types._
-import org.apache.spark.test.{TestUtils, VectorQueryTest}
-import org.apache.spark.test.TestFPWrapper._
-
-final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
-
-  test("anomaly") {
-    import hiveContext.implicits._
-    val df = spark.range(1000).selectExpr("id AS time", "rand() AS x")
-    // TODO: Test results more strictly
-    assert(df.sort($"time".asc).select(changefinder($"x")).count === 1000)
-    assert(df.sort($"time".asc).select(sst($"x", lit("-th 0.005"))).count === 
1000)
-  }
-
-  test("knn.similarity") {
-    val df1 = DummyInputData.select(cosine_sim(lit2(Seq(1, 2, 3, 4)), 
lit2(Seq(3, 4, 5, 6))))
-    assert(df1.collect.apply(0).getFloat(0) ~== 0.500f)
-
-    val df2 = DummyInputData.select(jaccard(lit(5), lit(6)))
-    assert(df2.collect.apply(0).getFloat(0) ~== 0.96875f)
-
-    val df3 = DummyInputData.select(angular_similarity(lit2(Seq(1, 2, 3)), 
lit2(Seq(4, 5, 6))))
-    assert(df3.collect.apply(0).getFloat(0) ~== 0.500f)
-
-    val df4 = DummyInputData.select(euclid_similarity(lit2(Seq(5, 3, 1)), 
lit2(Seq(2, 8, 3))))
-    assert(df4.collect.apply(0).getFloat(0) ~== 0.33333334f)
-
-    val df5 = DummyInputData.select(distance2similarity(lit(1.0)))
-    assert(df5.collect.apply(0).getFloat(0) ~== 0.5f)
-  }
-
-  test("knn.distance") {
-    val df1 = DummyInputData.select(hamming_distance(lit(1), lit(3)))
-    checkAnswer(df1, Row(1) :: Nil)
-
-    val df2 = DummyInputData.select(popcnt(lit(1)))
-    checkAnswer(df2, Row(1) :: Nil)
-
-    val df3 = DummyInputData.select(kld(lit(0.1), lit(0.5), lit(0.2), 
lit(0.5)))
-    assert(df3.collect.apply(0).getDouble(0) ~== 0.01)
-
-    val df4 = DummyInputData.select(
-      euclid_distance(lit2(Seq("0.1", "0.5")), lit2(Seq("0.2", "0.5"))))
-    assert(df4.collect.apply(0).getFloat(0) ~== 1.4142135f)
-
-    val df5 = DummyInputData.select(
-      cosine_distance(lit2(Seq("0.8", "0.3")), lit2(Seq("0.4", "0.6"))))
-    assert(df5.collect.apply(0).getFloat(0) ~== 1.0f)
-
-    val df6 = DummyInputData.select(
-      angular_distance(lit2(Seq("0.1", "0.1")), lit2(Seq("0.3", "0.8"))))
-    assert(df6.collect.apply(0).getFloat(0) ~== 0.50f)
-
-    val df7 = DummyInputData.select(
-      manhattan_distance(lit2(Seq("0.7", "0.8")), lit2(Seq("0.5", "0.6"))))
-    assert(df7.collect.apply(0).getFloat(0) ~== 4.0f)
-
-    val df8 = DummyInputData.select(
-      minkowski_distance(lit2(Seq("0.1", "0.2")), lit2(Seq("0.2", "0.2")), 
lit2(1.0)))
-    assert(df8.collect.apply(0).getFloat(0) ~== 2.0f)
-  }
-
-  test("knn.lsh") {
-    import hiveContext.implicits._
-    assert(IntList2Data.minhash(lit(1), $"target").count() > 0)
-
-    assert(DummyInputData.select(bbit_minhash(lit2(Seq("1:0.1", "2:0.5")), 
lit(false))).count
-      == DummyInputData.count)
-    assert(DummyInputData.select(minhashes(lit2(Seq("1:0.1", "2:0.5")), 
lit(false))).count
-      == DummyInputData.count)
-  }
-
-  test("ftvec - add_bias") {
-    import hiveContext.implicits._
-    checkAnswer(TinyTrainData.select(add_bias($"features")),
-        Row(Seq("1:0.8", "2:0.2", "0:1.0")) ::
-        Row(Seq("2:0.7", "0:1.0")) ::
-        Row(Seq("1:0.9", "0:1.0")) ::
-        Nil
-      )
-  }
-
-  test("ftvec - extract_feature") {
-    val df = DummyInputData.select(extract_feature(lit("1:0.8")))
-    checkAnswer(df, Row("1") :: Nil)
-  }
-
-  test("ftvec - extract_weight") {
-    val df = DummyInputData.select(extract_weight(lit("3:0.1")))
-    assert(df.collect.apply(0).getDouble(0) ~== 0.1)
-  }
-
-  test("ftvec - explode_array") {
-    import hiveContext.implicits._
-    val df = TinyTrainData.explode_array($"features").select($"feature")
-    checkAnswer(df, Row("1:0.8") :: Row("2:0.2") :: Row("2:0.7") :: 
Row("1:0.9") :: Nil)
-  }
-
-  test("ftvec - add_feature_index") {
-    import hiveContext.implicits._
-    val doubleListData = Seq(Array(0.8, 0.5), Array(0.3, 0.1), 
Array(0.2)).toDF("data")
-    checkAnswer(
-        doubleListData.select(add_feature_index($"data")),
-        Row(Seq("1:0.8", "2:0.5")) ::
-        Row(Seq("1:0.3", "2:0.1")) ::
-        Row(Seq("1:0.2")) ::
-        Nil
-      )
-  }
-
-  test("ftvec - sort_by_feature") {
-    // import hiveContext.implicits._
-    val intFloatMapData = {
-      // TODO: Use `toDF`
-      val rowRdd = hiveContext.sparkContext.parallelize(
-          Row(Map(1 -> 0.3f, 2 -> 0.1f, 3 -> 0.5f)) ::
-          Row(Map(2 -> 0.4f, 1 -> 0.2f)) ::
-          Row(Map(2 -> 0.4f, 3 -> 0.2f, 1 -> 0.1f, 4 -> 0.6f)) ::
-          Nil
-        )
-      hiveContext.createDataFrame(
-        rowRdd,
-        StructType(
-          StructField("data", MapType(IntegerType, FloatType), true) ::
-          Nil)
-        )
-    }
-    val sortedKeys = 
intFloatMapData.select(sort_by_feature(intFloatMapData.col("data")))
-      .collect.map {
-        case Row(m: Map[Int, Float]) => m.keysIterator.toSeq
-    }
-    assert(sortedKeys.toSet === Set(Seq(1, 2, 3), Seq(1, 2), Seq(1, 2, 3, 4)))
-  }
-
-  test("ftvec.hash") {
-    assert(DummyInputData.select(mhash(lit("test"))).count == 
DummyInputData.count)
-    
assert(DummyInputData.select(org.apache.spark.sql.hive.HivemallOps.sha1(lit("test"))).count
 ==
-      DummyInputData.count)
-    // TODO: The tests below failed because:
-    //   org.apache.spark.sql.AnalysisException: List type in java is 
unsupported because JVM type
-    //     erasure makes spark fail to catch a component type in List<>;
-    //
-    // assert(DummyInputData.select(array_hash_values(lit2(Seq("aaa", 
"bbb")))).count
-    //   == DummyInputData.count)
-    // assert(DummyInputData.select(
-    //     prefixed_hash_values(lit2(Seq("ccc", "ddd")), lit("prefix"))).count
-    //   == DummyInputData.count)
-  }
-
-  test("ftvec.scaling") {
-    val df1 = TinyTrainData.select(rescale(lit(2.0f), lit(1.0), lit(5.0)))
-    assert(df1.collect.apply(0).getFloat(0) === 0.25f)
-    val df2 = TinyTrainData.select(zscore(lit(1.0f), lit(0.5), lit(0.5)))
-    assert(df2.collect.apply(0).getFloat(0) === 1.0f)
-    val df3 = TinyTrainData.select(normalize(TinyTrainData.col("features")))
-    checkAnswer(
-      df3,
-      Row(Seq("1:0.9701425", "2:0.24253562")) ::
-      Row(Seq("2:1.0")) ::
-      Row(Seq("1:1.0")) ::
-      Nil)
-  }
-
-  test("ftvec.selection - chi2") {
-    import hiveContext.implicits._
-
-    // See also hivemall.ftvec.selection.ChiSquareUDFTest
-    val df = Seq(
-      Seq(
-        Seq(250.29999999999998, 170.90000000000003, 73.2, 12.199999999999996),
-        Seq(296.8, 138.50000000000003, 212.99999999999997, 66.3),
-        Seq(329.3999999999999, 148.7, 277.59999999999997, 101.29999999999998)
-      ) -> Seq(
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 
59.93333511948589),
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 
59.93333511948589),
-        Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 
59.93333511948589)))
-      .toDF("arg0", "arg1")
-
-    val result = df.select(chi2(df("arg0"), df("arg1"))).collect
-    assert(result.length == 1)
-    val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0)
-    val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1)
-
-    (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-
-    (pVal, Seq(4.47651499e-03, 1.65754167e-01, 5.94344354e-26, 2.50017968e-15))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-  }
-
-  test("ftvec.conv - quantify") {
-    import hiveContext.implicits._
-    val testDf = Seq((1, "aaa", true), (2, "bbb", false), (3, "aaa", 
false)).toDF
-    // This test is done in a single partition because `HivemallOps#quantify` 
assigns identifiers
-    // for non-numerical values in each partition.
-    checkAnswer(
-      testDf.coalesce(1).quantify(lit(true) +: testDf.cols: _*),
-      Row(1, 0, 0) :: Row(2, 1, 1) :: Row(3, 0, 1) :: Nil)
-  }
-
-  test("ftvec.amplify") {
-    import hiveContext.implicits._
-    assert(TinyTrainData.amplify(lit(3), $"label", $"features").count() == 9)
-    assert(TinyTrainData.part_amplify(lit(3)).count() == 9)
-    // TODO: The test below failed because:
-    //   java.lang.RuntimeException: Unsupported literal type class 
scala.Tuple3
-    //     (-buf 128,label,features)
-    //
-    // assert(TinyTrainData.rand_amplify(lit(3), lit("-buf 8", $"label", 
$"features")).count() == 9)
-  }
-
-  ignore("ftvec.conv") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: 
Nil)).toDF("a", "b")
-    checkAnswer(
-      df1.select(to_dense_features(df1("b"), lit(3))),
-      Row(Array(0.1f, 0.0f, 0.3f)) :: Row(Array(0.0f, 0.2f, 0.0f)) :: Nil
-    )
-    val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    checkAnswer(
-      df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))),
-      Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", 
"3:0.4")) :: Nil
-    )
-  }
-
-  test("ftvec.trans") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c")
-    checkAnswer(
-      df1.binarize_label($"a", $"b", $"c"),
-      Row(1, 1) :: Row(1, 1) :: Row(1, 1) :: Nil
-    )
-
-    val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b")
-    checkAnswer(
-      df2.select(vectorize_features(lit2(Seq("a", "b")), df2("a"), df2("b"))),
-      Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil
-    )
-
-    val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b")
-    checkAnswer(
-      df3.select(categorical_features(lit2(Seq("a", "b")), df3("a"), 
df3("b"))),
-      Row(Seq("a#c11", "b#c12")) :: Row(Seq("a#c21", "b#c22")) :: Nil
-    )
-
-    val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c")
-    checkAnswer(
-      df4.select(indexed_features(df4("a"), df4("b"), df4("c"))),
-      Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", 
"3:0.4")) :: Nil
-    )
-
-    val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", 
"c").coalesce(1)
-    checkAnswer(
-      df5.quantified_features(lit(true), df5("a"), df5("b"), df5("c")),
-      Row(Seq(0.0, 0.0, 0.0)) :: Row(Seq(1.0, 0.0, 1.0)) :: Nil
-    )
-
-    val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b")
-    checkAnswer(
-      df6.select(quantitative_features(lit2(Seq("a", "b")), df6("a"), 
df6("b"))),
-      Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil
-    )
-  }
-
-  test("misc - hivemall_version") {
-    checkAnswer(DummyInputData.select(hivemall_version()), 
Row("0.5.1-incubating-SNAPSHOT"))
-  }
-
-  test("misc - rowid") {
-    assert(DummyInputData.select(rowid()).distinct.count == 
DummyInputData.count)
-  }
-
-  test("misc - each_top_k") {
-    import hiveContext.implicits._
-    val testDf = Seq(
-      ("a", "1", 0.5, Array(0, 1, 2)),
-      ("b", "5", 0.1, Array(3)),
-      ("a", "3", 0.8, Array(2, 5)),
-      ("c", "6", 0.3, Array(1, 3)),
-      ("b", "4", 0.3, Array(2)),
-      ("a", "2", 0.6, Array(1))
-    ).toDF("key", "value", "score", "data")
-
-    // Compute top-1 rows for each group
-    checkAnswer(
-      testDf.each_top_k(lit(1), $"key", $"score"),
-      Row(1, "a", "3", 0.8, Array(2, 5)) ::
-      Row(1, "b", "4", 0.3, Array(2)) ::
-      Row(1, "c", "6", 0.3, Array(1, 3)) ::
-      Nil
-    )
-
-    // Compute reverse top-1 rows for each group
-    checkAnswer(
-      testDf.each_top_k(lit(-1), $"key", $"score"),
-      Row(1, "a", "1", 0.5, Array(0, 1, 2)) ::
-      Row(1, "b", "5", 0.1, Array(3)) ::
-      Row(1, "c", "6", 0.3, Array(1, 3)) ::
-      Nil
-    )
-
-    // Check if some exceptions thrown in case of some conditions
-    assert(intercept[AnalysisException] { testDf.each_top_k(lit(0.1), $"key", 
$"score") }
-      .getMessage contains "`k` must be integer, however")
-    assert(intercept[AnalysisException] { testDf.each_top_k(lit(1), $"key", 
$"data") }
-      .getMessage contains "must have a comparable type")
-  }
-
-  test("HIVEMALL-76 top-K funcs must assign the same rank with the rows having 
the same scores") {
-    import hiveContext.implicits._
-    val testDf = Seq(
-      ("a", "1", 0.1),
-      ("b", "5", 0.1),
-      ("a", "3", 0.1),
-      ("b", "4", 0.1),
-      ("a", "2", 0.0)
-    ).toDF("key", "value", "score")
-
-    // Compute top-1 rows for each group
-    checkAnswer(
-      testDf.each_top_k(lit(2), $"key", $"score"),
-      Row(1, "a", "3", 0.1) ::
-      Row(1, "a", "1", 0.1) ::
-      Row(1, "b", "4", 0.1) ::
-      Row(1, "b", "5", 0.1) ::
-      Nil
-    )
-  }
-
-  /**
-   * This test fails because;
-   *
-   * Cause: java.lang.OutOfMemoryError: Java heap space
-   *  at hivemall.smile.tools.RandomForestEnsembleUDAF$Result.<init>
-   *    (RandomForestEnsembleUDAF.java:128)
-   *  at 
hivemall.smile.tools.RandomForestEnsembleUDAF$RandomForestPredictUDAFEvaluator
-   *    .terminate(RandomForestEnsembleUDAF.java:91)
-   *  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-   */
-  ignore("misc - tree_predict") {
-    import hiveContext.implicits._
-
-    val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil))
-      .toDF("label", "features")
-      .train_randomforest_regr($"features", $"label")
-
-    val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: 
Nil))
-      .toDF("label", "features")
-      .select(rowid(), $"label", $"features")
-
-    val predicted = model
-      .join(testData).coalesce(1)
-      .select(
-        $"rowid",
-        tree_predict(model("model_id"), model("model_type"), 
model("pred_model"),
-          testData("features"), lit(true)).as("predicted")
-      )
-      .groupBy($"rowid")
-      .rf_ensemble("predicted").toDF("rowid", "predicted")
-      .select($"predicted.label")
-
-    checkAnswer(predicted, Seq(Row(0), Row(1)))
-  }
-
-  test("tools.array - select_k_best") {
-    import hiveContext.implicits._
-
-    val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9))
-    val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", 
"importance_list")
-    val k = 2
-
-    checkAnswer(
-      df.select(select_k_best(df("features"), df("importance_list"), lit(k))),
-      Row(Seq(0.0, 3.0)) :: Row(Seq(2.0, 1.0)) :: Row(Seq(5.0, 9.0)) :: Nil
-    )
-  }
-
-  test("misc - sigmoid") {
-    import hiveContext.implicits._
-    assert(DummyInputData.select(sigmoid($"c0")).collect.apply(0).getDouble(0) 
~== 0.500)
-  }
-
-  test("misc - lr_datagen") {
-    assert(TinyTrainData.lr_datagen(lit("-n_examples 100 -n_features 10 -seed 
100")).count >= 100)
-  }
-
-  test("invoke regression functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      TestUtils.invokeFunc(new HivemallOps(TinyTrainData), func, 
Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke classifier functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      TestUtils.invokeFunc(new HivemallOps(TinyTrainData), func, 
Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke multiclass classifier functions") {
-    import hiveContext.implicits._
-    Seq(
-      "train_multiclass_perceptron",
-      "train_multiclass_pa",
-      "train_multiclass_pa1",
-      "train_multiclass_pa2",
-      "train_multiclass_cw",
-      "train_multiclass_arow",
-      "train_multiclass_scw",
-      "train_multiclass_scw2"
-    ).map { func =>
-      // TODO: Why is a label type [Int|Text] only in multiclass classifiers?
-      TestUtils.invokeFunc(
-          new HivemallOps(TinyTrainData), func, Seq($"features", 
$"label".cast(IntegerType)))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  test("invoke random forest functions") {
-    import hiveContext.implicits._
-    val testDf = Seq(
-      (Array(0.3, 0.1, 0.2), 1),
-      (Array(0.3, 0.1, 0.2), 0),
-      (Array(0.3, 0.1, 0.2), 0)).toDF("features", "label")
-    Seq(
-      "train_randomforest_regr",
-      "train_randomforest_classifier"
-    ).map { func =>
-      TestUtils.invokeFunc(new HivemallOps(testDf.coalesce(1)), func, 
Seq($"features", $"label"))
-        .foreach(_ => {}) // Just call it
-    }
-  }
-
-  protected def checkRegrPrecision(func: String): Unit = {
-    import hiveContext.implicits._
-
-    // Build a model
-    val model = {
-      val res = TestUtils.invokeFunc(new HivemallOps(LargeRegrTrainData),
-        func, Seq(add_bias($"features"), $"label"))
-      if (!res.columns.contains("conv")) {
-        res.groupBy("feature").agg("weight" -> "avg")
-      } else {
-        res.groupBy("feature").argmin_kld("weight", "conv")
-      }
-    }.toDF("feature", "weight")
-
-    // Data preparation
-    val testDf = LargeRegrTrainData
-      .select(rowid(), $"label".as("target"), $"features")
-      .cache
-
-    val testDf_exploded = testDf
-      .explode_array($"features")
-      .select($"rowid", extract_feature($"feature"), 
extract_weight($"feature"))
-
-    // Do prediction
-    val predict = testDf_exploded
-      .join(model, testDf_exploded("feature") === model("feature"), 
"LEFT_OUTER")
-      .select($"rowid", ($"weight" * $"value").as("value"))
-      .groupBy("rowid").sum("value")
-      .toDF("rowid", "predicted")
-
-    // Evaluation
-    val eval = predict
-      .join(testDf, predict("rowid") === testDf("rowid"))
-      .groupBy()
-      .agg(Map("target" -> "avg", "predicted" -> "avg"))
-      .toDF("target", "predicted")
-
-    val diff = eval.map {
-      case Row(target: Double, predicted: Double) =>
-        Math.abs(target - predicted)
-    }.first
-
-    TestUtils.expectResult(diff > 0.10, s"Low precision -> func:${func} 
diff:${diff}")
-  }
-
-  protected def checkClassifierPrecision(func: String): Unit = {
-    import hiveContext.implicits._
-
-    // Build a model
-    val model = {
-      val res = TestUtils.invokeFunc(new HivemallOps(LargeClassifierTrainData),
-        func, Seq(add_bias($"features"), $"label"))
-      if (!res.columns.contains("conv")) {
-        res.groupBy("feature").agg("weight" -> "avg")
-      } else {
-        res.groupBy("feature").argmin_kld("weight", "conv")
-      }
-    }.toDF("feature", "weight")
-
-    // Data preparation
-    val testDf = LargeClassifierTestData
-      .select(rowid(), $"label".as("target"), $"features")
-      .cache
-
-    val testDf_exploded = testDf
-      .explode_array($"features")
-      .select($"rowid", extract_feature($"feature"), 
extract_weight($"feature"))
-
-    // Do prediction
-    val predict = testDf_exploded
-      .join(model, testDf_exploded("feature") === model("feature"), 
"LEFT_OUTER")
-      .select($"rowid", ($"weight" * $"value").as("value"))
-      .groupBy("rowid").sum("value")
-      /**
-       * TODO: This sentence throws an exception below:
-       *
-       * WARN Column: Constructing trivially true equals predicate, 
'rowid#1323 = rowid#1323'.
-       * Perhaps you need to use aliases.
-       */
-      .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 
1.0).otherwise(0.0))
-      .toDF("rowid", "predicted")
-
-    // Evaluation
-    val eval = predict
-      .join(testDf, predict("rowid") === testDf("rowid"))
-      .where($"target" === $"predicted")
-
-    val precision = (eval.count + 0.0) / predict.count
-
-    TestUtils.expectResult(precision < 0.70, s"Low precision -> func:${func} 
value:${precision}")
-  }
-
-  ignore("check regression precision") {
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      checkRegrPrecision(func)
-    }
-  }
-
-  ignore("check classifier precision") {
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      checkClassifierPrecision(func)
-    }
-  }
-
-  test("user-defined aggregators for ensembles") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF("c0", "c1")
-    val row1 = df1.groupBy($"c0").voted_avg("c1").collect
-    assert(row1(0).getDouble(1) ~== 0.15)
-    assert(row1(1).getDouble(1) ~== 0.10)
-
-    val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF("c0", "c1")
-    val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect
-    assert(row3(0).getDouble(1) ~== 0.50)
-    assert(row3(1).getDouble(1) ~== 0.30)
-
-    val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 
0.9f)).toDF("c0", "c1", "c2")
-    val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect
-    assert(row5(0).getFloat(1) ~== 0.266666666)
-    assert(row5(1).getFloat(1) ~== 0.80)
-
-    val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 
0.1f)).toDF("c0", "c1", "c2")
-    val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect
-    assert(row6(0).getString(1) == "id-1")
-
-    val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 
0.2f)).toDF("c0", "c1", "c2")
-    val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", 
"c1").select($"c1.col1").collect
-    assert(row7(0).getString(0) == "id-0")
-
-    // val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1")
-    // val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1")
-    //   .select("c1.probability").collect
-    // assert(row8(0).getDouble(0) ~== 0.3333333333)
-    // assert(row8(1).getDouble(0) ~== 1.0)
-  }
-
-  test("user-defined aggregators for evaluation") {
-    import hiveContext.implicits._
-
-    val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 
0.2f)).toDF("c0", "c1", "c2")
-    val row1 = df1.groupBy($"c0").mae("c1", "c2").collect
-    assert(row1(0).getDouble(1) ~== 0.26666666)
-
-    val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 
0.3f)).toDF("c0", "c1", "c2")
-    val row2 = df2.groupBy($"c0").mse("c1", "c2").collect
-    assert(row2(0).getDouble(1) ~== 0.29999999)
-
-    val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 
0.3f)).toDF("c0", "c1", "c2")
-    val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect
-    assert(row3(0).getDouble(1) ~== 0.54772253)
-
-    val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 
4))).toDF
-      .toDF("c0", "c1", "c2")
-    val row4 = df4.groupBy($"c0").f1score("c1", "c2").collect
-    assert(row4(0).getDouble(1) ~== 0.25)
-  }
-
-  test("user-defined aggregators for ftvec.trans") {
-    import hiveContext.implicits._
-
-    val df0 = Seq((1, "cat", "mammal", 9), (1, "dog", "mammal", 10), (1, 
"human", "mammal", 10),
-      (1, "seahawk", "bird", 101), (1, "wasp", "insect", 3), (1, "wasp", 
"insect", 9),
-      (1, "cat", "mammal", 101), (1, "dog", "mammal", 1), (1, "human", 
"mammal", 9))
-      .toDF("col0", "cat1", "cat2", "cat3")
-    val row00 = df0.groupBy($"col0").onehot_encoding("cat1")
-    val row01 = df0.groupBy($"col0").onehot_encoding("cat1", "cat2", "cat3")
-
-    val result000 = row00.collect()(0).getAs[Row](1).getAs[Map[String, Int]](0)
-    val result01 = row01.collect()(0).getAs[Row](1)
-    val result010 = result01.getAs[Map[String, Int]](0)
-    val result011 = result01.getAs[Map[String, Int]](1)
-    val result012 = result01.getAs[Map[String, Int]](2)
-
-    assert(result000.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
-    assert(result000.values.toSet === Set(1, 2, 3, 4, 5))
-    assert(result010.keySet === Set("seahawk", "cat", "human", "wasp", "dog"))
-    assert(result010.values.toSet === Set(1, 2, 3, 4, 5))
-    assert(result011.keySet === Set("bird", "insect", "mammal"))
-    assert(result011.values.toSet === Set(6, 7, 8))
-    assert(result012.keySet === Set(1, 3, 9, 10, 101))
-    assert(result012.values.toSet === Set(9, 10, 11, 12, 13))
-  }
-
-  test("user-defined aggregators for ftvec.selection") {
-    import hiveContext.implicits._
-
-    // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest
-    // binary class
-    // +-----------------+-------+
-    // |     features    | class |
-    // +-----------------+-------+
-    // | 5.1,3.5,1.4,0.2 |     0 |
-    // | 4.9,3.0,1.4,0.2 |     0 |
-    // | 4.7,3.2,1.3,0.2 |     0 |
-    // | 7.0,3.2,4.7,1.4 |     1 |
-    // | 6.4,3.2,4.5,1.5 |     1 |
-    // | 6.9,3.1,4.9,1.5 |     1 |
-    // +-----------------+-------+
-    val df0 = Seq(
-      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), 
Seq(1, 0)),
-      (1, Seq(4.7, 3.2, 1.3, 0.2), Seq(1, 0)), (1, Seq(7.0, 3.2, 4.7, 1.4), 
Seq(0, 1)),
-      (1, Seq(6.4, 3.2, 4.5, 1.5), Seq(0, 1)), (1, Seq(6.9, 3.1, 4.9, 1.5), 
Seq(0, 1)))
-      .toDF("c0", "arg0", "arg1")
-    val row0 = df0.groupBy($"c0").snr("arg0", "arg1").collect
-    (row0(0).getAs[Seq[Double]](1), Seq(4.38425236, 0.26390002, 15.83984511, 
26.87005769))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-
-    // multiple class
-    // +-----------------+-------+
-    // |     features    | class |
-    // +-----------------+-------+
-    // | 5.1,3.5,1.4,0.2 |     0 |
-    // | 4.9,3.0,1.4,0.2 |     0 |
-    // | 7.0,3.2,4.7,1.4 |     1 |
-    // | 6.4,3.2,4.5,1.5 |     1 |
-    // | 6.3,3.3,6.0,2.5 |     2 |
-    // | 5.8,2.7,5.1,1.9 |     2 |
-    // +-----------------+-------+
-    val df1 = Seq(
-      (1, Seq(5.1, 3.5, 1.4, 0.2), Seq(1, 0, 0)), (1, Seq(4.9, 3.0, 1.4, 0.2), 
Seq(1, 0, 0)),
-      (1, Seq(7.0, 3.2, 4.7, 1.4), Seq(0, 1, 0)), (1, Seq(6.4, 3.2, 4.5, 1.5), 
Seq(0, 1, 0)),
-      (1, Seq(6.3, 3.3, 6.0, 2.5), Seq(0, 0, 1)), (1, Seq(5.8, 2.7, 5.1, 1.9), 
Seq(0, 0, 1)))
-      .toDF("c0", "arg0", "arg1")
-    val row1 = df1.groupBy($"c0").snr("arg0", "arg1").collect
-    (row1(0).getAs[Seq[Double]](1), Seq(8.43181818, 1.32121212, 42.94949495, 
33.80952381))
-      .zipped
-      .foreach((actual, expected) => assert(actual ~== expected))
-  }
-
-  test("user-defined aggregators for tools.matrix") {
-    import hiveContext.implicits._
-
-    // | 1  2  3 |T    | 5  6  7 |
-    // | 3  4  5 |  *  | 7  8  9 |
-    val df0 = Seq((1, Seq(1, 2, 3), Seq(5, 6, 7)), (1, Seq(3, 4, 5), Seq(7, 8, 
9)))
-      .toDF("c0", "arg0", "arg1")
-
-    checkAnswer(df0.groupBy($"c0").transpose_and_dot("arg0", "arg1"),
-      Seq(Row(1, Seq(Seq(26.0, 30.0, 34.0), Seq(38.0, 44.0, 50.0), Seq(50.0, 
58.0, 66.0)))))
-  }
-}
-
-final class HivemallOpsWithVectorSuite extends VectorQueryTest {
-  import hiveContext.implicits._
-
-  test("to_hivemall_features") {
-    checkAnswer(
-      mllibTrainDf.select(to_hivemall_features($"features")),
-      Seq(
-        Row(Seq("0:1.0", "2:2.0", "4:3.0")),
-        Row(Seq("0:1.0", "3:1.5", "4:2.1", "6:1.2")),
-        Row(Seq("0:1.1", "3:1.0", "4:2.3", "6:1.0")),
-        Row(Seq("1:4.0", "3:5.0", "5:6.0"))
-      )
-    )
-  }
-
-  ignore("append_bias") {
-    /**
-     * TODO: This test throws an exception:
-     * Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot 
resolve
-     *   'UDF(UDF(features))' due to data type mismatch: argument 1 requires 
vector type,
-     *    however, 'UDF(features)' is of vector type.; line 2 pos 8
-     */
-    checkAnswer(
-      mllibTrainDf.select(to_hivemall_features(append_bias($"features"))),
-      Seq(
-        Row(Seq("0:1.0", "0:1.0", "2:2.0", "4:3.0")),
-        Row(Seq("0:1.0", "0:1.0", "3:1.5", "4:2.1", "6:1.2")),
-        Row(Seq("0:1.0", "0:1.1", "3:1.0", "4:2.3", "6:1.0")),
-        Row(Seq("0:1.0", "1:4.0", "3:5.0", "5:6.0"))
-      )
-    )
-  }
-
-  test("explode_vector") {
-    checkAnswer(
-      mllibTrainDf.explode_vector($"features").select($"feature", $"weight"),
-      Seq(
-        Row("0", 1.0), Row("0", 1.0), Row("0", 1.1),
-        Row("1", 4.0),
-        Row("2", 2.0),
-        Row("3", 1.0), Row("3", 1.5), Row("3", 5.0),
-        Row("4", 2.1), Row("4", 2.3), Row("4", 3.0),
-        Row("5", 6.0),
-        Row("6", 1.0), Row("6", 1.2)
-      )
-    )
-  }
-
-  test("train_logregr") {
-    checkAnswer(
-      mllibTrainDf.train_logregr($"features", $"label")
-        .groupBy("feature").agg("weight" -> "avg")
-        .select($"feature"),
-      Seq(0, 1, 2, 3, 4, 5, 6).map(v => Row(s"$v"))
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
deleted file mode 100644
index 06a4dc0..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/ModelMixingSuite.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.io.{BufferedInputStream, BufferedReader, InputStream, 
InputStreamReader}
-import java.net.URL
-import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService}
-
-import hivemall.mix.server.MixServer
-import hivemall.utils.lang.CommandLineUtils
-import hivemall.utils.net.NetUtils
-import org.apache.commons.cli.Options
-import org.apache.commons.compress.compressors.CompressorStreamFactory
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.sql.{Column, DataFrame, Row}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HivemallGroupedDataset._
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.test.TestUtils
-
-final class ModelMixingSuite extends SparkFunSuite with BeforeAndAfter {
-
-  // Load A9a training and test data
-  val a9aLineParser = (line: String) => {
-    val elements = line.split(" ")
-    val (label, features) = (elements.head, elements.tail)
-    HivemallLabeledPoint(if (label == "+1") 1.0f else 0.0f, features)
-  }
-
-  lazy val trainA9aData: DataFrame =
-    getDataFromURI(
-      new 
URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a";).openStream,
-      a9aLineParser)
-
-  lazy val testA9aData: DataFrame =
-    getDataFromURI(
-      new 
URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/a9a.t";).openStream,
-      a9aLineParser)
-
-  // Load A9a training and test data
-  val kdd2010aLineParser = (line: String) => {
-    val elements = line.split(" ")
-    val (label, features) = (elements.head, elements.tail)
-    HivemallLabeledPoint(if (label == "1") 1.0f else 0.0f, features)
-  }
-
-  lazy val trainKdd2010aData: DataFrame =
-    getDataFromURI(
-      new CompressorStreamFactory().createCompressorInputStream(
-        new BufferedInputStream(
-          new 
URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.bz2";)
-            .openStream
-        )
-      ),
-      kdd2010aLineParser,
-      8)
-
-  lazy val testKdd2010aData: DataFrame =
-    getDataFromURI(
-      new CompressorStreamFactory().createCompressorInputStream(
-        new BufferedInputStream(
-          new 
URL("http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/kdda.t.bz2";)
-            .openStream
-        )
-      ),
-      kdd2010aLineParser,
-      8)
-
-  // Placeholder for a mix server
-  var mixServExec: ExecutorService = _
-  var assignedPort: Int = _
-
-  private def getDataFromURI(
-      in: InputStream, lineParseFunc: String => HivemallLabeledPoint, numPart: 
Int = 2)
-    : DataFrame = {
-    val reader = new BufferedReader(new InputStreamReader(in))
-    try {
-      // Cache all data because stream closed soon
-      val lines = FileIterator(reader.readLine()).toSeq
-      val rdd = TestHive.sparkContext.parallelize(lines, 
numPart).map(lineParseFunc)
-      val df = rdd.toDF.cache
-      df.foreach(_ => {})
-      df
-    } finally {
-      reader.close()
-    }
-  }
-
-  before {
-    assert(mixServExec == null)
-
-    // Launch a MIX server as thread
-    assignedPort = NetUtils.getAvailablePort
-    val method = classOf[MixServer].getDeclaredMethod("getOptions")
-    method.setAccessible(true)
-    val options = method.invoke(null).asInstanceOf[Options]
-    val cl = CommandLineUtils.parseOptions(
-      Array(
-        "-port", Integer.toString(assignedPort),
-        "-sync_threshold", "1"
-      ),
-      options
-    )
-    val server = new MixServer(cl)
-    mixServExec = Executors.newSingleThreadExecutor()
-    mixServExec.submit(server)
-    var retry = 0
-    while (server.getState() != MixServer.ServerState.RUNNING && retry < 32) {
-      Thread.sleep(100L)
-      retry += 1
-    }
-    assert(MixServer.ServerState.RUNNING == server.getState)
-  }
-
-  after {
-    mixServExec.shutdownNow()
-    mixServExec = null
-  }
-
-  TestUtils.benchmark("model mixing test w/ regression") {
-    Seq(
-      "train_adadelta",
-      "train_adagrad",
-      "train_arow_regr",
-      "train_arowe_regr",
-      "train_arowe2_regr",
-      "train_logregr",
-      "train_pa1_regr",
-      "train_pa1a_regr",
-      "train_pa2_regr",
-      "train_pa2a_regr"
-    ).map { func =>
-      // Build a model
-      val model = {
-        val groupId = 
s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = TestUtils.invokeFunc(
-          new HivemallOps(trainA9aData.part_amplify(lit(1))),
-          func,
-          Seq[Column](
-            add_bias($"features"),
-            $"label",
-            lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 " +
-              "-mix_cancel")
-          )
-        )
-        if (!res.columns.contains("conv")) {
-          res.groupBy("feature").agg("weight" -> "avg")
-        } else {
-          res.groupBy("feature").argmin_kld("weight", "conv")
-        }
-      }.toDF("feature", "weight")
-
-      // Data preparation
-      val testDf = testA9aData
-        .select(rowid(), $"label".as("target"), $"features")
-        .cache
-
-      val testDf_exploded = testDf
-        .explode_array($"features")
-        .select($"rowid", extract_feature($"feature"), 
extract_weight($"feature"))
-
-      // Do prediction
-      val predict = testDf_exploded
-        .join(model, testDf_exploded("feature") === model("feature"), 
"LEFT_OUTER")
-        .select($"rowid", ($"weight" * $"value").as("value"))
-        .groupBy("rowid").sum("value")
-        .toDF("rowid", "predicted")
-
-      // Evaluation
-      val eval = predict
-        .join(testDf, predict("rowid") === testDf("rowid"))
-        .groupBy()
-        .agg(Map("target" -> "avg", "predicted" -> "avg"))
-        .toDF("target", "predicted")
-
-      val (target, predicted) = eval.map {
-        case Row(target: Double, predicted: Double) => (target, predicted)
-      }.first
-
-      // scalastyle:off println
-      println(s"func:${func} target:${target} predicted:${predicted} "
-        + s"diff:${Math.abs(target - predicted)}")
-
-      testDf.unpersist()
-    }
-  }
-
-  TestUtils.benchmark("model mixing test w/ binary classification") {
-    Seq(
-      "train_perceptron",
-      "train_pa",
-      "train_pa1",
-      "train_pa2",
-      "train_cw",
-      "train_arow",
-      "train_arowh",
-      "train_scw",
-      "train_scw2",
-      "train_adagrad_rda"
-    ).map { func =>
-      // Build a model
-      val model = {
-        val groupId = 
s"${TestHive.sparkContext.applicationId}-${UUID.randomUUID}"
-        val res = TestUtils.invokeFunc(
-          new HivemallOps(trainKdd2010aData.part_amplify(lit(1))),
-          func,
-          Seq[Column](
-            add_bias($"features"),
-            $"label",
-            lit(s"-mix localhost:${assignedPort} -mix_session ${groupId} 
-mix_threshold 2 " +
-              "-mix_cancel")
-          )
-        )
-        if (!res.columns.contains("conv")) {
-          res.groupBy("feature").agg("weight" -> "avg")
-        } else {
-          res.groupBy("feature").argmin_kld("weight", "conv")
-        }
-      }.toDF("feature", "weight")
-
-      // Data preparation
-      val testDf = testKdd2010aData
-        .select(rowid(), $"label".as("target"), $"features")
-        .cache
-
-      val testDf_exploded = testDf
-        .explode_array($"features")
-        .select($"rowid", extract_feature($"feature"), 
extract_weight($"feature"))
-
-      // Do prediction
-      val predict = testDf_exploded
-        .join(model, testDf_exploded("feature") === model("feature"), 
"LEFT_OUTER")
-        .select($"rowid", ($"weight" * $"value").as("value"))
-        .groupBy("rowid").sum("value")
-        .select($"rowid", when(sigmoid($"sum(value)") > 0.50, 
1.0).otherwise(0.0))
-        .toDF("rowid", "predicted")
-
-      // Evaluation
-      val eval = predict
-        .join(testDf, predict("rowid") === testDf("rowid"))
-        .where($"target" === $"predicted")
-
-      // scalastyle:off println
-      println(s"func:${func} precision:${(eval.count + 0.0) / predict.count}")
-
-      testDf.unpersist()
-      predict.unpersist()
-    }
-  }
-}
-
-object FileIterator {
-
-  def apply[A](f: => A): Iterator[A] = new Iterator[A] {
-    var opt = Option(f)
-    def hasNext = opt.nonEmpty
-    def next() = {
-      val r = opt.get
-      opt = Option(f)
-      r
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
deleted file mode 100644
index 37e0989..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/XGBoostSuite.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import hivemall.xgboost._
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.HivemallGroupedDataset._
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.types._
-import org.apache.spark.test.VectorQueryTest
-
-final class XGBoostSuite extends VectorQueryTest {
-  import hiveContext.implicits._
-
-  private val defaultOptions = XGBoostOptions()
-    .set("num_round", "10")
-    .set("max_depth", "4")
-
-  private val numModles = 3
-
-  private def countModels(dirPath: String): Int = {
-    new 
File(dirPath).listFiles().toSeq.count(_.getName.startsWith("xgbmodel-"))
-  }
-
-  test("resolve libxgboost") {
-    def getProvidingClass(name: String): Class[_] =
-      DataSource(sparkSession = null, className = name).providingClass
-    assert(getProvidingClass("libxgboost") ===
-      classOf[org.apache.spark.sql.hive.source.XGBoostFileFormat])
-  }
-
-  test("check XGBoost options") {
-    assert(s"$defaultOptions" == "-max_depth 4 -num_round 10")
-    val errMsg = intercept[IllegalArgumentException] {
-      defaultOptions.set("unknown", "3")
-    }
-    assert(errMsg.getMessage == "requirement failed: " +
-      "non-existing key detected in XGBoost options: unknown")
-  }
-
-  test("train_xgboost_regr") {
-    withTempModelDir { tempDir =>
-
-      // Save built models in persistent storage
-      mllibTrainDf.repartition(numModles)
-        .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}"))
-        .write.format("libxgboost").save(tempDir)
-
-      // Check #models generated by XGBoost
-      assert(countModels(tempDir) == numModles)
-
-      // Load the saved models
-      val model = 
hiveContext.sparkSession.read.format("libxgboost").load(tempDir)
-      val predict = model.join(mllibTestDf)
-        .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
-        .groupBy("rowid").avg()
-        .toDF("rowid", "predicted")
-
-      val result = predict.join(mllibTestDf, predict("rowid") === 
mllibTestDf("rowid"), "INNER")
-        .select(predict("rowid"), $"predicted", $"label")
-
-      result.select(avg(abs($"predicted" - $"label"))).collect.map {
-        case Row(diff: Double) => assert(diff > 0.0)
-      }
-    }
-  }
-
-  test("train_xgboost_classifier") {
-    withTempModelDir { tempDir =>
-
-      mllibTrainDf.repartition(numModles)
-        .train_xgboost_regr($"features", $"label", lit(s"${defaultOptions}"))
-        .write.format("libxgboost").save(tempDir)
-
-      // Check #models generated by XGBoost
-      assert(countModels(tempDir) == numModles)
-
-      val model = 
hiveContext.sparkSession.read.format("libxgboost").load(tempDir)
-      val predict = model.join(mllibTestDf)
-        .xgboost_predict($"rowid", $"features", $"model_id", $"pred_model")
-        .groupBy("rowid").avg()
-        .toDF("rowid", "predicted")
-
-      val result = predict.join(mllibTestDf, predict("rowid") === 
mllibTestDf("rowid"), "INNER")
-        .select(
-          when($"predicted" >= 0.50, 1).otherwise(0),
-          $"label".cast(IntegerType)
-        )
-        .toDF("predicted", "label")
-
-      assert((result.where($"label" === $"predicted").count + 0.0) / 
result.count > 0.0)
-    }
-  }
-
-  test("train_xgboost_multiclass_classifier") {
-    withTempModelDir { tempDir =>
-
-      mllibTrainDf.repartition(numModles)
-        .train_xgboost_multiclass_classifier(
-          $"features", $"label", lit(s"${defaultOptions.set("num_class", 
"2")}"))
-        .write.format("libxgboost").save(tempDir)
-
-      // Check #models generated by XGBoost
-      assert(countModels(tempDir) == numModles)
-
-      val model = 
hiveContext.sparkSession.read.format("libxgboost").load(tempDir)
-      val predict = model.join(mllibTestDf)
-        .xgboost_multiclass_predict($"rowid", $"features", $"model_id", 
$"pred_model")
-        .groupBy("rowid").max_label("probability", "label")
-        .toDF("rowid", "predicted")
-
-      val result = predict.join(mllibTestDf, predict("rowid") === 
mllibTestDf("rowid"), "INNER")
-        .select(
-          predict("rowid"),
-          $"predicted",
-          $"label".cast(IntegerType)
-        )
-
-      assert((result.where($"label" === $"predicted").count + 0.0) / 
result.count > 0.0)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
deleted file mode 100644
index 87782b9..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/benchmark/MiscBenchmark.scala
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.benchmark
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{EachTopK, Expression, 
Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, 
Project}
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.{HiveGenericUDF, HiveGenericUDTF}
-import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
-import org.apache.spark.sql.types._
-import org.apache.spark.test.TestUtils
-import org.apache.spark.util.Benchmark
-
-class TestFuncWrapper(df: DataFrame) {
-
-  def each_top_k(k: Column, group: Column, value: Column, args: Column*)
-    : DataFrame = withTypedPlan {
-    val clusterDf = df.repartition(group).sortWithinPartitions(group)
-    Generate(HiveGenericUDTF(
-        "each_top_k",
-        new HiveFunctionWrapper("hivemall.tools.EachTopKUDTF"),
-        (Seq(k, group, value) ++ args).map(_.expr)),
-      join = false, outer = false, None,
-      (Seq("rank", "key") ++ 
args.map(_.named.name)).map(UnresolvedAttribute(_)),
-      clusterDf.logicalPlan)
-  }
-
-  def each_top_k_improved(k: Int, group: String, score: String, args: String*)
-    : DataFrame = withTypedPlan {
-    val clusterDf = df.repartition(df(group)).sortWithinPartitions(group)
-    val childrenAttributes = clusterDf.logicalPlan.output
-    val generator = Generate(
-      EachTopK(
-        k,
-        clusterDf.resolve(group),
-        clusterDf.resolve(score),
-        childrenAttributes
-      ),
-      join = false, outer = false, None,
-      (Seq("rank") ++ 
childrenAttributes.map(_.name)).map(UnresolvedAttribute(_)),
-      clusterDf.logicalPlan)
-    val attributes = generator.generatedSet
-    val projectList = (Seq("rank") ++ args).map(s => attributes.find(_.name == 
s).get)
-    Project(projectList, generator)
-  }
-
-  /**
-   * A convenient function to wrap a logical plan and produce a DataFrame.
-   */
-  @inline private[this] def withTypedPlan(logicalPlan: => LogicalPlan): 
DataFrame = {
-    val queryExecution = df.sparkSession.sessionState.executePlan(logicalPlan)
-    val outputSchema = queryExecution.sparkPlan.schema
-    new Dataset[Row](df.sparkSession, queryExecution, RowEncoder(outputSchema))
-  }
-}
-
-object TestFuncWrapper {
-
-  /**
-   * Implicitly inject the [[TestFuncWrapper]] into [[DataFrame]].
-   */
-  implicit def dataFrameToTestFuncWrapper(df: DataFrame): TestFuncWrapper =
-    new TestFuncWrapper(df)
-
-  def sigmoid(exprs: Column*): Column = withExpr {
-    HiveGenericUDF("sigmoid",
-      new HiveFunctionWrapper("hivemall.tools.math.SigmoidGenericUDF"),
-      exprs.map(_.expr))
-  }
-
-  /**
-   * A convenient function to wrap an expression and produce a Column.
-   */
-  @inline private def withExpr(expr: Expression): Column = Column(expr)
-}
-
-class MiscBenchmark extends SparkFunSuite {
-
-  lazy val sparkSession = SparkSession.builder
-    .master("local[1]")
-    .appName("microbenchmark")
-    .config("spark.sql.shuffle.partitions", 1)
-    .config("spark.sql.codegen.wholeStage", true)
-    .getOrCreate()
-
-  val numIters = 3
-
-  private def addBenchmarkCase(name: String, df: DataFrame)(implicit 
benchmark: Benchmark): Unit = {
-    benchmark.addCase(name, numIters) { _ =>
-      df.queryExecution.executedPlan(0).execute().foreach(x => Unit)
-    }
-  }
-
-  TestUtils.benchmark("closure/exprs/spark-udf/hive-udf") {
-    /**
-     * Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
-     * Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-     *
-     * sigmoid functions:       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   
Relative
-     * 
--------------------------------------------------------------------------------
-     * exprs                         7708 / 8173          3.4         294.0    
   1.0X
-     * closure                       7722 / 8342          3.4         294.6    
   1.0X
-     * spark-udf                     7963 / 8350          3.3         303.8    
   1.0X
-     * hive-udf                    13977 / 14050          1.9         533.2    
   0.6X
-     */
-    import sparkSession.sqlContext.implicits._
-    val N = 100L << 18
-    implicit val benchmark = new Benchmark("sigmoid", N)
-    val schema = StructType(
-      StructField("value", DoubleType) :: Nil
-    )
-    val testDf = sparkSession.createDataFrame(
-      
sparkSession.range(N).map(_.toDouble).map(Row(_))(RowEncoder(schema)).rdd,
-      schema
-    )
-    testDf.cache.count // Cached
-
-    def sigmoidExprs(expr: Column): Column = {
-      val one: () => Literal = () => Literal.create(1.0, DoubleType)
-      Column(one()) / (Column(one()) + exp(-expr))
-    }
-    addBenchmarkCase(
-      "exprs",
-      testDf.select(sigmoidExprs($"value"))
-    )
-
-    addBenchmarkCase(
-      "closure",
-      testDf.map { d =>
-        Row(1.0 / (1.0 + Math.exp(-d.getDouble(0))))
-      }(RowEncoder(schema))
-    )
-
-    val sigmoidUdf = udf { (d: Double) => 1.0 / (1.0 + Math.exp(-d)) }
-    addBenchmarkCase(
-      "spark-udf",
-      testDf.select(sigmoidUdf($"value"))
-    )
-    addBenchmarkCase(
-      "hive-udf",
-      testDf.select(TestFuncWrapper.sigmoid($"value"))
-    )
-
-    benchmark.run()
-  }
-
-  TestUtils.benchmark("top-k query") {
-    /**
-     * top-k (k=100):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   
Relative
-     * 
-------------------------------------------------------------------------------
-     * rank                       62748 / 62862          0.4        2393.6     
  1.0X
-     * each_top_k (hive-udf)      41421 / 41736          0.6        1580.1     
  1.5X
-     * each_top_k (exprs)         15793 / 16394          1.7         602.5     
  4.0X
-     */
-    import sparkSession.sqlContext.implicits._
-    import TestFuncWrapper._
-    val N = 100L << 18
-    val topK = 100
-    val numGroup = 3
-    implicit val benchmark = new Benchmark(s"top-k (k=$topK)", N)
-    val schema = StructType(
-      StructField("key", IntegerType) ::
-      StructField("score", DoubleType) ::
-      StructField("value", StringType) ::
-      Nil
-    )
-    val testDf = {
-      val df = sparkSession.createDataFrame(
-        sparkSession.sparkContext.range(0, N).map(_.toInt).map { d =>
-          Row(d % numGroup, scala.util.Random.nextDouble(), s"group-${d % 
numGroup}")
-        },
-        schema
-      )
-      // Test data are clustered by group keys
-      df.repartition($"key").sortWithinPartitions($"key")
-    }
-    testDf.cache.count // Cached
-
-    addBenchmarkCase(
-      "rank",
-      testDf.withColumn(
-        "rank", rank().over(Window.partitionBy($"key").orderBy($"score".desc))
-      ).where($"rank" <= topK)
-    )
-
-    addBenchmarkCase(
-      "each_top_k (hive-udf)",
-      // TODO: If $"value" given, it throws `AnalysisException`. Why?
-      // testDf.each_top_k(10, $"key", $"score", $"value")
-      // org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to name
-      //   on unresolved object, tree: unresolvedalias('value, None)
-      // at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAlias.name(unresolved.scala:339)
-      testDf.each_top_k(lit(topK), $"key", $"score", testDf("value"))
-    )
-
-    addBenchmarkCase(
-      "each_top_k (exprs)",
-      testDf.each_top_k_improved(topK, "key", "score", "value")
-    )
-
-    benchmark.run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
deleted file mode 100644
index a4733f5..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.test
-
-import scala.collection.mutable.Seq
-import scala.reflect.runtime.universe.TypeTag
-
-import hivemall.tools.RegressionDatagen
-
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.QueryTest
-
-/**
- * Base class for tests with Hivemall features.
- */
-abstract class HivemallFeatureQueryTest extends QueryTest with 
TestHiveSingleton {
-
-  import hiveContext.implicits._
-
-  /**
-   * TODO: spark-2.0 does not support literals for some types (e.g., Seq[_] 
and Array[_]).
-   * So, it provides that functionality here.
-   * This helper function will be removed in future releases.
-   */
-  protected def lit2[T : TypeTag](v: T): Column = {
-    val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T]
-    val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
-    Column(Literal(convert(v), dataType))
-  }
-
-  protected val DummyInputData = Seq((0, 0)).toDF("c0", "c1")
-
-  protected val IntList2Data =
-    Seq(
-      (8 :: 5 :: Nil, 6 :: 4 :: Nil),
-      (3 :: 1 :: Nil, 3 :: 2 :: Nil),
-      (2 :: Nil, 3 :: Nil)
-    ).toDF("target", "predict")
-
-  protected val Float2Data =
-    Seq(
-      (0.8f, 0.3f), (0.3f, 0.9f), (0.2f, 0.4f)
-    ).toDF("target", "predict")
-
-  protected val TinyTrainData =
-    Seq(
-      (0.0, "1:0.8" :: "2:0.2" :: Nil),
-      (1.0, "2:0.7" :: Nil),
-      (0.0, "1:0.9" :: Nil)
-    ).toDF("label", "features")
-
-  protected val TinyTestData =
-    Seq(
-      (0.0, "1:0.6" :: "2:0.1" :: Nil),
-      (1.0, "2:0.9" :: Nil),
-      (0.0, "1:0.2" :: Nil),
-      (0.0, "2:0.1" :: Nil),
-      (0.0, "0:0.6" :: "2:0.4" :: Nil)
-    ).toDF("label", "features")
-
-  protected val LargeRegrTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 3,
-      prob_one = 0.8f
-    ).cache
-
-  protected val LargeRegrTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 3,
-      prob_one = 0.5f
-    ).cache
-
-  protected val LargeClassifierTrainData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100000,
-      seed = 5,
-      prob_one = 0.8f,
-      cl = true
-    ).cache
-
-  protected val LargeClassifierTestData = RegressionDatagen.exec(
-      hiveContext,
-      n_partitions = 2,
-      min_examples = 100,
-      seed = 5,
-      prob_one = 0.5f,
-      cl = true
-    ).cache
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
deleted file mode 100644
index eeb32dc..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.hive.test
-
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.SparkFunSuite
-
-trait TestHiveSingleton extends SparkFunSuite with BeforeAndAfterAll {
-  protected val spark: SparkSession = TestHive.sparkSession
-  protected val hiveContext: TestHiveContext = TestHive
-
-  protected override def afterAll(): Unit = {
-    try {
-      hiveContext.reset()
-    } finally {
-      super.afterAll()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
 
b/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
deleted file mode 100644
index b15c77c..0000000
--- 
a/spark/spark-2.0/src/test/scala/org/apache/spark/streaming/HivemallOpsWithFeatureSuite.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.streaming
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.ml.feature.HivemallLabeledPoint
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.test.HivemallFeatureQueryTest
-import org.apache.spark.streaming.HivemallStreamingOps._
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.scheduler.StreamInputInfo
-
-/**
- * This is an input stream just for tests.
- */
-private[this] class TestInputStream[T: ClassTag](
-    ssc: StreamingContext,
-    input: Seq[Seq[T]],
-    numPartitions: Int) extends InputDStream[T](ssc) {
-
-  override def start() {}
-
-  override def stop() {}
-
-  override def compute(validTime: Time): Option[RDD[T]] = {
-    logInfo("Computing RDD for time " + validTime)
-    val index = ((validTime - zeroTime) / slideDuration - 1).toInt
-    val selectedInput = if (index < input.size) input(index) else Seq[T]()
-
-    // lets us test cases where RDDs are not created
-    if (selectedInput == null) {
-      return None
-    }
-
-    // Report the input data's information to InputInfoTracker for testing
-    val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
-    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
-    val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
-    logInfo("Created RDD " + rdd.id + " with " + selectedInput)
-    Some(rdd)
-  }
-}
-
-final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest {
-
-  // This implicit value used in `HivemallStreamingOps`
-  implicit val sqlCtx = hiveContext
-
-  /**
-   * Run a block of code with the given StreamingContext.
-   * This method do not stop a given SparkContext because other tests share 
the context.
-   */
-  private def withStreamingContext[R](ssc: StreamingContext)(block: 
StreamingContext => R): Unit = {
-    try {
-      block(ssc)
-      ssc.start()
-      ssc.awaitTerminationOrTimeout(10 * 1000) // 10s wait
-    } finally {
-      try {
-        ssc.stop(stopSparkContext = false)
-      } catch {
-        case e: Exception => logError("Error stopping StreamingContext", e)
-      }
-    }
-  }
-
-  // scalastyle:off line.size.limit
-
-  /**
-   * This test below fails sometimes (too flaky), so we temporarily ignore it.
-   * The stacktrace of this failure is:
-   *
-   * HivemallOpsWithFeatureSuite:
-   *  Exception in thread "broadcast-exchange-60" java.lang.OutOfMemoryError: 
Java heap space
-   *   at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
-   *   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:231)
-   *   at 
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:78)
-   *   at 
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:65)
-   *   at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
-   *   at 
net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:235)
-   *   at 
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:175)
-   *   at 
java.io.ObjectOutputStream$BlockDataOutputStream.close(ObjectOutputStream.java:1827)
-   *   at java.io.ObjectOutputStream.close(ObjectOutputStream.java:741)
-   *   at 
org.apache.spark.serializer.JavaSerializationStream.close(JavaSerializer.scala:57)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:238)
-   *   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1296)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:237)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:107)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:86)
-   *   at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
-   *   ...
-   */
-
-  // scalastyle:on line.size.limit
-
-  ignore("streaming") {
-    import sqlCtx.implicits._
-
-    // We assume we build a model in advance
-    val testModel = Seq(
-      ("0", 0.3f), ("1", 0.1f), ("2", 0.6f), ("3", 0.2f)
-    ).toDF("feature", "weight")
-
-    withStreamingContext(new StreamingContext(sqlCtx.sparkContext, 
Milliseconds(100))) { ssc =>
-      val inputData = Seq(
-        Seq(HivemallLabeledPoint(features = "1:0.6" :: "2:0.1" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "2:0.9" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "1:0.2" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "2:0.1" :: Nil)),
-        Seq(HivemallLabeledPoint(features = "0:0.6" :: "2:0.4" :: Nil))
-      )
-
-      val inputStream = new TestInputStream[HivemallLabeledPoint](ssc, 
inputData, 1)
-
-      // Apply predictions on input streams
-      val prediction = inputStream.predict { streamDf =>
-          val df = streamDf.select(rowid(), 
$"features").explode_array($"features")
-          val testDf = df.select(
-            // TODO: `$"feature"` throws AnalysisException, why?
-            $"rowid", extract_feature(df("feature")), 
extract_weight(df("feature"))
-          )
-          testDf.join(testModel, testDf("feature") === testModel("feature"), 
"LEFT_OUTER")
-            .select($"rowid", ($"weight" * $"value").as("value"))
-            .groupBy("rowid").sum("value")
-            .toDF("rowid", "value")
-            .select($"rowid", sigmoid($"value"))
-        }
-
-      // Dummy output stream
-      prediction.foreachRDD(_ => {})
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
deleted file mode 100644
index 8a2a385..0000000
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/test/TestUtils.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import scala.reflect.runtime.{universe => ru}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.DataFrame
-
-object TestUtils extends Logging {
-
-  // Do benchmark if INFO-log enabled
-  def benchmark(benchName: String)(testFunc: => Unit): Unit = {
-    if (log.isDebugEnabled) {
-      testFunc
-    }
-  }
-
-  def expectResult(res: Boolean, errMsg: String): Unit = if (res) {
-    logWarning(errMsg)
-  }
-
-  def invokeFunc(cls: Any, func: String, args: Any*): DataFrame = try {
-    // Invoke a function with the given name via reflection
-    val im = scala.reflect.runtime.currentMirror.reflect(cls)
-    val mSym = im.symbol.typeSignature.member(ru.newTermName(func)).asMethod
-    im.reflectMethod(mSym).apply(args: _*)
-      .asInstanceOf[DataFrame]
-  } catch {
-    case e: Exception =>
-      assert(false, s"Invoking ${func} failed because: ${e.getMessage}")
-      null // Not executed
-  }
-}
-
-// TODO: Any same function in o.a.spark.*?
-class TestFPWrapper(d: Double) {
-
-  // Check an equality between Double/Float values
-  def ~==(d: Double): Boolean = Math.abs(this.d - d) < 0.001
-}
-
-object TestFPWrapper {
-
-  @inline implicit def toTestFPWrapper(d: Double): TestFPWrapper = {
-    new TestFPWrapper(d)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/cd24be89/spark/spark-2.0/src/test/scala/org/apache/spark/test/VectorQueryTest.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-2.0/src/test/scala/org/apache/spark/test/VectorQueryTest.scala 
b/spark/spark-2.0/src/test/scala/org/apache/spark/test/VectorQueryTest.scala
deleted file mode 100644
index e1384d1..0000000
--- a/spark/spark-2.0/src/test/scala/org/apache/spark/test/VectorQueryTest.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.test
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import com.google.common.io.Files
-
-import org.apache.spark.sql.{DataFrame, QueryTest}
-import org.apache.spark.sql.hive.HivemallOps._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.util.Utils
-
-/**
- * Base class for tests with SparkSQL VectorUDT data.
- */
-abstract class VectorQueryTest extends QueryTest with TestHiveSingleton {
-  import hiveContext.implicits._
-
-  private var trainDir: File = _
-  private var testDir: File = _
-
-  // A `libsvm` schema is (Double, ml.linalg.Vector)
-  protected var mllibTrainDf: DataFrame = _
-  protected var mllibTestDf: DataFrame = _
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    val trainLines =
-      """
-        |1 1:1.0 3:2.0 5:3.0
-        |0 2:4.0 4:5.0 6:6.0
-        |1 1:1.1 4:1.0 5:2.3 7:1.0
-        |1 1:1.0 4:1.5 5:2.1 7:1.2
-      """.stripMargin
-    trainDir = Utils.createTempDir()
-    Files.write(trainLines, new File(trainDir, "train-00000"), 
StandardCharsets.UTF_8)
-    val testLines =
-      """
-        |1 1:1.3 3:2.1 5:2.8
-        |0 2:3.9 4:5.3 6:8.0
-      """.stripMargin
-    testDir = Utils.createTempDir()
-    Files.write(testLines, new File(testDir, "test-00000"), 
StandardCharsets.UTF_8)
-
-    mllibTrainDf = spark.read.format("libsvm").load(trainDir.getAbsolutePath)
-    // Must be cached because rowid() is deterministic
-    mllibTestDf = spark.read.format("libsvm").load(testDir.getAbsolutePath)
-      .withColumn("rowid", rowid()).cache
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      Utils.deleteRecursively(trainDir)
-      Utils.deleteRecursively(testDir)
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  protected def withTempModelDir(f: String => Unit): Unit = {
-    var tempDir: File = null
-    try {
-      tempDir = Utils.createTempDir()
-      f(tempDir.getAbsolutePath + "/xgboost_models")
-    } catch {
-      case e: Throwable => fail(s"Unexpected exception detected: ${e}")
-    } finally {
-      Utils.deleteRecursively(tempDir)
-    }
-  }
-}

Reply via email to