Repository: incubator-hivemall Updated Branches: refs/heads/master fdf702143 -> e8abae257
http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala b/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala index 6b5d4cd..de2481c 100644 --- a/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala +++ b/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/HivemallOpsSuite.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.test.TestFPWrapper._ import org.apache.spark.test.TestUtils -final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { + +class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("anomaly") { import hiveContext.implicits._ @@ -42,61 +43,113 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } test("knn.similarity") { - val df1 = DummyInputData.select(cosine_sim(lit2(Seq(1, 2, 3, 4)), lit2(Seq(3, 4, 5, 6)))) - assert(df1.collect.apply(0).getFloat(0) ~== 0.500f) + import hiveContext.implicits._ + + val df1 = DummyInputData.select( + cosine_similarity(typedLit(Seq(1, 2, 3, 4)), typedLit(Seq(3, 4, 5, 6)))) + val rows1 = df1.collect + assert(rows1.length == 1) + assert(rows1(0).getFloat(0) ~== 0.500f) - val df2 = DummyInputData.select(jaccard(lit(5), lit(6))) - assert(df2.collect.apply(0).getFloat(0) ~== 0.96875f) + val df2 = DummyInputData.select(jaccard_similarity(lit(5), lit(6))) + val rows2 = df2.collect + assert(rows2.length == 1) + assert(rows2(0).getFloat(0) ~== 0.96875f) - val df3 = DummyInputData.select(angular_similarity(lit2(Seq(1, 2, 3)), lit2(Seq(4, 5, 6)))) - assert(df3.collect.apply(0).getFloat(0) ~== 0.500f) + val df3 = DummyInputData.select( + angular_similarity(typedLit(Seq(1, 2, 3)), typedLit(Seq(4, 5, 6)))) + val rows3 = df3.collect + assert(rows3.length == 1) + assert(rows3(0).getFloat(0) ~== 0.500f) - val df4 = DummyInputData.select(euclid_similarity(lit2(Seq(5, 3, 1)), lit2(Seq(2, 8, 3)))) - assert(df4.collect.apply(0).getFloat(0) ~== 0.33333334f) + val df4 = DummyInputData.select( + euclid_similarity(typedLit(Seq(5, 3, 1)), typedLit(Seq(2, 8, 3)))) + val rows4 = df4.collect + assert(rows4.length == 1) + assert(rows4(0).getFloat(0) ~== 0.33333334f) val df5 = DummyInputData.select(distance2similarity(lit(1.0))) - assert(df5.collect.apply(0).getFloat(0) ~== 0.5f) + val rows5 = df5.collect + assert(rows5.length == 1) + assert(rows5(0).getFloat(0) ~== 0.5f) + + val df6 = Seq((Seq("1:0.3", "4:0.1"), Map(0 -> 0.5))).toDF("a", "b") + // TODO: Currently, just check if no exception thrown + assert(df6.dimsum_mapper(df6("a"), df6("b")).collect.isEmpty) } test("knn.distance") { val df1 = DummyInputData.select(hamming_distance(lit(1), lit(3))) - checkAnswer(df1, Row(1) :: Nil) + checkAnswer(df1, Row(1)) val df2 = DummyInputData.select(popcnt(lit(1))) - checkAnswer(df2, Row(1) :: Nil) - - val df3 = DummyInputData.select(kld(lit(0.1), lit(0.5), lit(0.2), lit(0.5))) - assert(df3.collect.apply(0).getDouble(0) ~== 0.01) - - val df4 = DummyInputData.select( - euclid_distance(lit2(Seq("0.1", "0.5")), lit2(Seq("0.2", "0.5")))) - assert(df4.collect.apply(0).getFloat(0) ~== 1.4142135f) - - val df5 = DummyInputData.select( - cosine_distance(lit2(Seq("0.8", "0.3")), lit2(Seq("0.4", "0.6")))) - assert(df5.collect.apply(0).getFloat(0) ~== 1.0f) - - val df6 = DummyInputData.select( - angular_distance(lit2(Seq("0.1", "0.1")), lit2(Seq("0.3", "0.8")))) - assert(df6.collect.apply(0).getFloat(0) ~== 0.50f) - - val df7 = DummyInputData.select( - manhattan_distance(lit2(Seq("0.7", "0.8")), lit2(Seq("0.5", "0.6")))) - assert(df7.collect.apply(0).getFloat(0) ~== 4.0f) - - val df8 = DummyInputData.select( - minkowski_distance(lit2(Seq("0.1", "0.2")), lit2(Seq("0.2", "0.2")), lit2(1.0))) - assert(df8.collect.apply(0).getFloat(0) ~== 2.0f) + checkAnswer(df2, Row(1)) + + val rows3 = DummyInputData.select(kld(lit(0.1), lit(0.5), lit(0.2), lit(0.5))).collect + assert(rows3.length === 1) + assert(rows3(0).getDouble(0) ~== 0.01) + + val rows4 = DummyInputData.select( + euclid_distance(typedLit(Seq("0.1", "0.5")), typedLit(Seq("0.2", "0.5")))).collect + assert(rows4.length === 1) + assert(rows4(0).getFloat(0) ~== 1.4142135f) + + val rows5 = DummyInputData.select( + cosine_distance(typedLit(Seq("0.8", "0.3")), typedLit(Seq("0.4", "0.6")))).collect + assert(rows5.length === 1) + assert(rows5(0).getFloat(0) ~== 1.0f) + + val rows6 = DummyInputData.select( + angular_distance(typedLit(Seq("0.1", "0.1")), typedLit(Seq("0.3", "0.8")))).collect + assert(rows6.length === 1) + assert(rows6(0).getFloat(0) ~== 0.50f) + + val rows7 = DummyInputData.select( + manhattan_distance(typedLit(Seq("0.7", "0.8")), typedLit(Seq("0.5", "0.6")))).collect + assert(rows7.length === 1) + assert(rows7(0).getFloat(0) ~== 4.0f) + + val rows8 = DummyInputData.select( + minkowski_distance(typedLit(Seq("0.1", "0.2")), typedLit(Seq("0.2", "0.2")), typedLit(1.0)) + ).collect + assert(rows8.length === 1) + assert(rows8(0).getFloat(0) ~== 2.0f) + + val rows9 = DummyInputData.select( + jaccard_distance(typedLit(Seq("0.3", "0.8")), typedLit(Seq("0.1", "0.2")))).collect + assert(rows9.length === 1) + assert(rows9(0).getFloat(0) ~== 1.0f) } test("knn.lsh") { import hiveContext.implicits._ - assert(IntList2Data.minhash(lit(1), $"target").count() > 0) - - assert(DummyInputData.select(bbit_minhash(lit2(Seq("1:0.1", "2:0.5")), lit(false))).count - == DummyInputData.count) - assert(DummyInputData.select(minhashes(lit2(Seq("1:0.1", "2:0.5")), lit(false))).count - == DummyInputData.count) + checkAnswer( + IntList2Data.minhash(lit(1), $"target"), + Row(1016022700, 1) :: + Row(1264890450, 1) :: + Row(1304330069, 1) :: + Row(1321870696, 1) :: + Row(1492709716, 1) :: + Row(1511363108, 1) :: + Row(1601347428, 1) :: + Row(1974434012, 1) :: + Row(2022223284, 1) :: + Row(326269457, 1) :: + Row(50559334, 1) :: + Row(716040854, 1) :: + Row(759249519, 1) :: + Row(809187771, 1) :: + Row(900899651, 1) :: + Nil + ) + checkAnswer( + DummyInputData.select(bbit_minhash(typedLit(Seq("1:0.1", "2:0.5")), lit(false))), + Row("31175986876675838064867796245644543067") + ) + checkAnswer( + DummyInputData.select(minhashes(typedLit(Seq("1:0.1", "2:0.5")), lit(false))), + Row(Seq(1571683640, 987207869, 370931990, 988455638, 846963275)) + ) } test("ftvec - add_bias") { @@ -111,12 +164,13 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("ftvec - extract_feature") { val df = DummyInputData.select(extract_feature(lit("1:0.8"))) - checkAnswer(df, Row("1") :: Nil) + checkAnswer(df, Row("1")) } test("ftvec - extract_weight") { - val df = DummyInputData.select(extract_weight(lit("3:0.1"))) - assert(df.collect.apply(0).getDouble(0) ~== 0.1) + val rows = DummyInputData.select(extract_weight(lit("3:0.1"))).collect + assert(rows.length === 1) + assert(rows(0).getDouble(0) ~== 0.1) } test("ftvec - explode_array") { @@ -162,26 +216,36 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } test("ftvec.hash") { - assert(DummyInputData.select(mhash(lit("test"))).count == DummyInputData.count) - assert(DummyInputData.select(org.apache.spark.sql.hive.HivemallOps.sha1(lit("test"))).count == - DummyInputData.count) - // TODO: The tests below failed because: - // org.apache.spark.sql.AnalysisException: List type in java is unsupported because JVM type - // erasure makes spark fail to catch a component type in List<>; - // - // assert(DummyInputData.select(array_hash_values(lit2(Seq("aaa", "bbb")))).count - // == DummyInputData.count) - // assert(DummyInputData.select( - // prefixed_hash_values(lit2(Seq("ccc", "ddd")), lit("prefix"))).count - // == DummyInputData.count) + checkAnswer(DummyInputData.select(mhash(lit("test"))), Row(4948445)) + checkAnswer(DummyInputData.select(HivemallOps.sha1(lit("test"))), Row(12184508)) + checkAnswer(DummyInputData.select(feature_hashing(typedLit(Seq("1:0.1", "3:0.5")))), + Row(Seq("11293631:0.1", "4331412:0.5"))) + checkAnswer(DummyInputData.select(array_hash_values(typedLit(Seq("aaa", "bbb")))), + Row(Seq(4063537, 8459207))) + checkAnswer(DummyInputData.select( + prefixed_hash_values(typedLit(Seq("ccc", "ddd")), lit("prefix"))), + Row(Seq("prefix7873825", "prefix8965544"))) + } + + test("ftvec.parting") { + checkAnswer(DummyInputData.select(polynomial_features(typedLit(Seq("2:0.4", "6:0.1")), lit(2))), + Row(Seq("2:0.4", "2^2:0.16000001", "2^6:0.040000003", "6:0.1", "6^6:0.010000001"))) + checkAnswer(DummyInputData.select(powered_features(typedLit(Seq("4:0.8", "5:0.2")), lit(2))), + Row(Seq("4:0.8", "4^2:0.64000005", "5:0.2", "5^2:0.040000003"))) } test("ftvec.scaling") { - val df1 = TinyTrainData.select(rescale(lit(2.0f), lit(1.0), lit(5.0))) - assert(df1.collect.apply(0).getFloat(0) === 0.25f) - val df2 = TinyTrainData.select(zscore(lit(1.0f), lit(0.5), lit(0.5))) - assert(df2.collect.apply(0).getFloat(0) === 1.0f) - val df3 = TinyTrainData.select(normalize(TinyTrainData.col("features"))) + val rows1 = TinyTrainData.select(rescale(lit(2.0f), lit(1.0), lit(5.0))).collect + assert(rows1.length === 3) + assert(rows1(0).getFloat(0) ~== 0.25f) + assert(rows1(1).getFloat(0) ~== 0.25f) + assert(rows1(2).getFloat(0) ~== 0.25f) + val rows2 = TinyTrainData.select(zscore(lit(1.0f), lit(0.5), lit(0.5))).collect + assert(rows2.length === 3) + assert(rows2(0).getFloat(0) ~== 1.0f) + assert(rows2(1).getFloat(0) ~== 1.0f) + assert(rows2(2).getFloat(0) ~== 1.0f) + val df3 = TinyTrainData.select(l2_normalize(TinyTrainData.col("features"))) checkAnswer( df3, Row(Seq("1:0.9701425", "2:0.24253562")) :: @@ -205,10 +269,10 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { Seq(292.1666753739119, 152.70000455081467, 187.93333893418327, 59.93333511948589))) .toDF("arg0", "arg1") - val result = df.select(chi2(df("arg0"), df("arg1"))).collect - assert(result.length == 1) - val chi2Val = result.head.getAs[Row](0).getAs[Seq[Double]](0) - val pVal = result.head.getAs[Row](0).getAs[Seq[Double]](1) + val rows = df.select(chi2(df("arg0"), df("arg1"))).collect + assert(rows.length == 1) + val chi2Val = rows.head.getAs[Row](0).getAs[Seq[Double]](0) + val pVal = rows.head.getAs[Row](0).getAs[Seq[Double]](1) (chi2Val, Seq(10.81782088, 3.59449902, 116.16984746, 67.24482759)) .zipped @@ -240,58 +304,260 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { // assert(TinyTrainData.rand_amplify(lit(3), lit("-buf 8", $"label", $"features")).count() == 9) } - ignore("ftvec.conv") { + test("ftvec.conv") { import hiveContext.implicits._ - val df1 = Seq((0.0, "1:0.1" :: "3:0.3" :: Nil), (1, 0, "2:0.2" :: Nil)).toDF("a", "b") checkAnswer( - df1.select(to_dense_features(df1("b"), lit(3))), - Row(Array(0.1f, 0.0f, 0.3f)) :: Row(Array(0.0f, 0.2f, 0.0f)) :: Nil + DummyInputData.select(to_dense_features(typedLit(Seq("0:0.1", "1:0.3")), lit(1))), + Row(Array(0.1f, 0.3f)) + ) + checkAnswer( + DummyInputData.select(to_sparse_features(typedLit(Seq(0.1f, 0.2f, 0.3f)))), + Row(Seq("0:0.1", "1:0.2", "2:0.3")) ) - val df2 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c") checkAnswer( - df2.select(to_sparse_features(df2("a"), df2("b"), df2("c"))), - Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", "3:0.4")) :: Nil + DummyInputData.select(feature_binning(typedLit(Seq("1")), typedLit(Map("1" -> Seq(0, 3))))), + Row(Seq("1")) ) } test("ftvec.trans") { import hiveContext.implicits._ + checkAnswer( + DummyInputData.select(vectorize_features(typedLit(Seq("a", "b")), lit(0.1f), lit(0.2f))), + Row(Seq("a:0.1", "b:0.2")) + ) + checkAnswer( + DummyInputData.select(categorical_features(typedLit(Seq("a", "b")), lit("c11"), lit("c12"))), + Row(Seq("a#c11", "b#c12")) + ) + checkAnswer( + DummyInputData.select(indexed_features(lit(0.1), lit(0.2), lit(0.3))), + Row(Seq("1:0.1", "2:0.2", "3:0.3")) + ) + checkAnswer( + DummyInputData.select(quantitative_features(typedLit(Seq("a", "b")), lit(0.1), lit(0.2))), + Row(Seq("a:0.1", "b:0.2")) + ) + checkAnswer( + DummyInputData.select(ffm_features(typedLit(Seq("1", "2")), lit(0.5), lit(0.2))), + Row(Seq("190:140405:1", "111:1058718:1")) + ) + checkAnswer( + DummyInputData.select(add_field_indicies(typedLit(Seq("0.5", "0.1")))), + Row(Seq("1:0.5", "2:0.1")) + ) + val df1 = Seq((1, -3, 1), (2, -2, 1)).toDF("a", "b", "c") checkAnswer( df1.binarize_label($"a", $"b", $"c"), Row(1, 1) :: Row(1, 1) :: Row(1, 1) :: Nil ) + val df2 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c").coalesce(1) + checkAnswer( + df2.quantified_features(lit(true), df2("a"), df2("b"), df2("c")), + Row(Seq(0.0, 0.0, 0.0)) :: Row(Seq(1.0, 0.0, 1.0)) :: Nil + ) + } + + test("ftvec.ranking") { + import hiveContext.implicits._ + + val df1 = Seq((1, 0 :: 3 :: 4 :: Nil), (2, 8 :: 9 :: Nil)).toDF("a", "b").coalesce(1) + checkAnswer( + df1.bpr_sampling($"a", $"b"), + Row(1, 0, 7) :: + Row(1, 3, 6) :: + Row(2, 8, 0) :: + Row(2, 8, 4) :: + Row(2, 9, 7) :: + Nil + ) + val df2 = Seq(1 :: 8 :: 9 :: Nil, 0 :: 3 :: Nil).toDF("a").coalesce(1) + checkAnswer( + df2.item_pairs_sampling($"a", lit(3)), + Row(0, 1) :: + Row(1, 0) :: + Row(3, 2) :: + Nil + ) + val df3 = Seq(3 :: 5 :: Nil, 0 :: Nil).toDF("a").coalesce(1) + checkAnswer( + df3.populate_not_in($"a", lit(1)), + Row(0) :: + Row(1) :: + Row(1) :: + Nil + ) + } - val df2 = Seq((0.1f, 0.2f), (0.5f, 0.3f)).toDF("a", "b") + test("tools") { + // checkAnswer( + // DummyInputData.select(convert_label(lit(5))), + // Nil + // ) checkAnswer( - df2.select(vectorize_features(lit2(Seq("a", "b")), df2("a"), df2("b"))), - Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil + DummyInputData.select(x_rank(lit("abc"))), + Row(1) ) + } - val df3 = Seq(("c11", "c12"), ("c21", "c22")).toDF("a", "b") + test("tools.array") { + checkAnswer( + DummyInputData.select(float_array(lit(3))), + Row(Seq()) + ) + checkAnswer( + DummyInputData.select(array_remove(typedLit(Seq(1, 2, 3)), lit(2))), + Row(Seq(1, 3)) + ) + checkAnswer( + DummyInputData.select(sort_and_uniq_array(typedLit(Seq(2, 1, 3, 1)))), + Row(Seq(1, 2, 3)) + ) + checkAnswer( + DummyInputData.select(subarray_endwith(typedLit(Seq(1, 2, 3, 4, 5)), lit(4))), + Row(Seq(1, 2, 3, 4)) + ) + checkAnswer( + DummyInputData.select( + array_concat(typedLit(Seq(1, 2)), typedLit(Seq(3)), typedLit(Seq(4, 5)))), + Row(Seq(1, 2, 3, 4, 5)) + ) checkAnswer( - df3.select(categorical_features(lit2(Seq("a", "b")), df3("a"), df3("b"))), - Row(Seq("a#c11", "b#c12")) :: Row(Seq("a#c21", "b#c22")) :: Nil + DummyInputData.select(subarray(typedLit(Seq(1, 2, 3, 4, 5)), lit(2), lit(4))), + Row(Seq(3, 4)) ) + checkAnswer( + DummyInputData.select(to_string_array(typedLit(Seq(1, 2, 3, 4, 5)))), + Row(Seq("1", "2", "3", "4", "5")) + ) + checkAnswer( + DummyInputData.select(array_intersect(typedLit(Seq(1, 2, 3)), typedLit(Seq(2, 3, 4)))), + Row(Seq(2, 3)) + ) + } + + test("tools.array - select_k_best") { + import hiveContext.implicits._ + + val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) + val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", "importance_list") + val k = 2 - val df4 = Seq((0.1, 0.2, 0.3), (0.2, 0.5, 0.4)).toDF("a", "b", "c") checkAnswer( - df4.select(indexed_features(df4("a"), df4("b"), df4("c"))), - Row(Seq("1:0.1", "2:0.2", "3:0.3")) :: Row(Seq("1:0.2", "2:0.5", "3:0.4")) :: Nil + df.select(select_k_best(df("features"), df("importance_list"), lit(k))), + Row(Seq(0.0, 3.0)) :: Row(Seq(2.0, 1.0)) :: Row(Seq(5.0, 9.0)) :: Nil ) + } - val df5 = Seq(("xxx", "yyy", 0), ("zzz", "yyy", 1)).toDF("a", "b", "c").coalesce(1) + test("tools.bits") { checkAnswer( - df5.quantified_features(lit(true), df5("a"), df5("b"), df5("c")), - Row(Seq(0.0, 0.0, 0.0)) :: Row(Seq(1.0, 0.0, 1.0)) :: Nil + DummyInputData.select(to_bits(typedLit(Seq(1, 3, 9)))), + Row(Seq(522L)) ) + checkAnswer( + DummyInputData.select(unbits(typedLit(Seq(1L, 3L)))), + Row(Seq(0L, 64L, 65L)) + ) + checkAnswer( + DummyInputData.select(bits_or(typedLit(Seq(1L, 3L)), typedLit(Seq(8L, 23L)))), + Row(Seq(9L, 23L)) + ) + } - val df6 = Seq((0.1, 0.2), (0.5, 0.3)).toDF("a", "b") + test("tools.compress") { checkAnswer( - df6.select(quantitative_features(lit2(Seq("a", "b")), df6("a"), df6("b"))), - Row(Seq("a:0.1", "b:0.2")) :: Row(Seq("a:0.5", "b:0.3")) :: Nil + DummyInputData.select(inflate(deflate(lit("input text")))), + Row("input text") + ) + } + + test("tools.map") { + val rows = DummyInputData.select( + map_get_sum(typedLit(Map(1 -> 0.2f, 2 -> 0.5f, 4 -> 0.8f)), typedLit(Seq(1, 4))) + ).collect + assert(rows.length === 1) + assert(rows(0).getDouble(0) ~== 1.0f) + + checkAnswer( + DummyInputData.select(map_tail_n(typedLit(Map(1 -> 2, 2 -> 5)), lit(1))), + Row(Map(2 -> 5)) + ) + } + + test("tools.text") { + checkAnswer( + DummyInputData.select(tokenize(lit("This is a pen"))), + Row("This" :: "is" :: "a" :: "pen" :: Nil) + ) + checkAnswer( + DummyInputData.select(is_stopword(lit("because"))), + Row(true) + ) + checkAnswer( + DummyInputData.select(singularize(lit("between"))), + Row("between") + ) + checkAnswer( + DummyInputData.select(split_words(lit("Hello, world"))), + Row("Hello," :: "world" :: Nil) + ) + checkAnswer( + DummyInputData.select(normalize_unicode(lit("abcdefg"))), + Row("abcdefg") + ) + checkAnswer( + DummyInputData.select(base91(typedLit("input text".getBytes))), + Row("xojg[@TX;R..B") + ) + checkAnswer( + DummyInputData.select(unbase91(lit("XXXX"))), + Row(68 :: -120 :: 8 :: Nil) + ) + checkAnswer( + DummyInputData.select(word_ngrams(typedLit("abcd" :: "efg" :: "hij" :: Nil), lit(2), lit(2))), + Row("abcd efg" :: "efg hij" :: Nil) + ) + } + + test("tools - generated_series") { + checkAnswer( + DummyInputData.generate_series(lit(0), lit(3)), + Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil + ) + } + + test("geospatial") { + val rows1 = DummyInputData.select(tilex2lon(lit(1), lit(6))).collect + assert(rows1.length === 1) + assert(rows1(0).getDouble(0) ~== -174.375) + + val rows2 = DummyInputData.select(tiley2lat(lit(1), lit(3))).collect + assert(rows2.length === 1) + assert(rows2(0).getDouble(0) ~== 79.17133464081945) + + val rows3 = DummyInputData.select( + haversine_distance(lit(0.3), lit(0.1), lit(0.4), lit(0.1))).collect + assert(rows3.length === 1) + assert(rows3(0).getDouble(0) ~== 11.119492664455878) + + checkAnswer( + DummyInputData.select(tile(lit(0.1), lit(0.8), lit(3))), + Row(28) + ) + checkAnswer( + DummyInputData.select(map_url(lit(0.1), lit(0.8), lit(3))), + Row("http://tile.openstreetmap.org/3/4/3.png") + ) + checkAnswer( + DummyInputData.select(lat2tiley(lit(0.3), lit(3))), + Row(3) + ) + checkAnswer( + DummyInputData.select(lon2tilex(lit(0.4), lit(2))), + Row(2) ) } @@ -494,7 +760,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val schema = new StructType().add("a", IntegerType).add("b", StringType) checkAnswer( df.select(from_csv($"value", schema)), - Row(Row(1, "abc")) :: Nil) + Row(Row(1, "abc"))) } test("misc - to_csv") { @@ -522,7 +788,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { val model = Seq((0.0, 0.1 :: 0.1 :: Nil), (1.0, 0.2 :: 0.3 :: 0.2 :: Nil)) .toDF("label", "features") - .train_randomforest_regr($"features", $"label") + .train_randomforest_regressor($"features", $"label") val testData = Seq((0.0, 0.1 :: 0.0 :: Nil), (1.0, 0.3 :: 0.5 :: 0.4 :: Nil)) .toDF("label", "features") @@ -542,22 +808,11 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { checkAnswer(predicted, Seq(Row(0), Row(1))) } - test("tools.array - select_k_best") { - import hiveContext.implicits._ - - val data = Seq(Seq(0, 1, 3), Seq(2, 4, 1), Seq(5, 4, 9)) - val df = data.map(d => (d, Seq(3, 1, 2))).toDF("features", "importance_list") - val k = 2 - - checkAnswer( - df.select(select_k_best(df("features"), df("importance_list"), lit(k))), - Row(Seq(0.0, 3.0)) :: Row(Seq(2.0, 1.0)) :: Row(Seq(5.0, 9.0)) :: Nil - ) - } - test("misc - sigmoid") { import hiveContext.implicits._ - assert(DummyInputData.select(sigmoid($"c0")).collect.apply(0).getDouble(0) ~== 0.500) + val rows = DummyInputData.select(sigmoid($"c0")).collect + assert(rows.length === 1) + assert(rows(0).getDouble(0) ~== 0.500) } test("misc - lr_datagen") { @@ -567,16 +822,18 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("invoke regression functions") { import hiveContext.implicits._ Seq( - "train_adadelta", - "train_adagrad", + "train_regressor", + "train_adadelta_regr", + "train_adagrad_regr", "train_arow_regr", "train_arowe_regr", "train_arowe2_regr", - "train_logregr", + "train_logistic_regr", "train_pa1_regr", "train_pa1a_regr", "train_pa2_regr", "train_pa2a_regr" + // "train_randomforest_regressor" ).map { func => TestUtils.invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label")) .foreach(_ => {}) // Just call it @@ -586,6 +843,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { test("invoke classifier functions") { import hiveContext.implicits._ Seq( + "train_classifier", "train_perceptron", "train_pa", "train_pa1", @@ -596,6 +854,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { "train_scw", "train_scw2", "train_adagrad_rda" + // "train_randomforest_classifier" ).map { func => TestUtils.invokeFunc(new HivemallOps(TinyTrainData), func, Seq($"features", $"label")) .foreach(_ => {}) // Just call it @@ -611,6 +870,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { "train_multiclass_pa2", "train_multiclass_cw", "train_multiclass_arow", + "train_multiclass_arowh", "train_multiclass_scw", "train_multiclass_scw2" ).map { func => @@ -628,7 +888,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { (Array(0.3, 0.1, 0.2), 0), (Array(0.3, 0.1, 0.2), 0)).toDF("features", "label") Seq( - "train_randomforest_regr", + "train_randomforest_regressor", "train_randomforest_classifier" ).map { func => TestUtils.invokeFunc(new HivemallOps(testDf.coalesce(1)), func, Seq($"features", $"label")) @@ -636,6 +896,27 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } } + test("invoke recommend functions") { + import hiveContext.implicits._ + val df = Seq((1, Map(1 -> 0.3), Map(2 -> Map(4 -> 0.1)), 0, Map(3 -> 0.5))) + .toDF("i", "r_i", "topKRatesOfI", "j", "r_j") + // Just call it + df.train_slim($"i", $"r_i", $"topKRatesOfI", $"j", $"r_j").collect + + } + + ignore("invoke topicmodel functions") { + import hiveContext.implicits._ + val testDf = Seq(Seq("abcd", "'efghij", "klmn")).toDF("words") + Seq( + "train_lda", + "train_plsa" + ).map { func => + TestUtils.invokeFunc(new HivemallOps(testDf.coalesce(1)), func, Seq($"words")) + .foreach(_ => {}) // Just call it + } + } + protected def checkRegrPrecision(func: String): Unit = { import hiveContext.implicits._ @@ -730,12 +1011,12 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { ignore("check regression precision") { Seq( - "train_adadelta", - "train_adagrad", + "train_adadelta_regr", + "train_adagrad_regr", "train_arow_regr", "train_arowe_regr", "train_arowe2_regr", - "train_logregr", + "train_logistic_regr", "train_pa1_regr", "train_pa1a_regr", "train_pa2_regr", @@ -762,61 +1043,212 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { } } - test("user-defined aggregators for ensembles") { + test("aggregations for classifiers") { + import hiveContext.implicits._ + val df1 = Seq((1, 0.1, 0.1, 0.2f, 0.2f, 0.2f, 0.2f)) + .toDF("key", "xh", "xk", "w0", "w1", "w2", "w3") + val row1 = df1.groupBy($"key").kpa_predict("xh", "xk", "w0", "w1", "w2", "w3").collect + assert(row1.length === 1) + assert(row1(0).getDouble(1) ~== 0.002000000029802) + } + + test("aggregations for ensembles") { import hiveContext.implicits._ - val df1 = Seq((1, 0.1f), (1, 0.2f), (2, 0.1f)).toDF("c0", "c1") - val row1 = df1.groupBy($"c0").voted_avg("c1").collect - assert(row1(0).getDouble(1) ~== 0.15) - assert(row1(1).getDouble(1) ~== 0.10) + val df1 = Seq((1, 0.1), (1, 0.2), (2, 0.1)).toDF("c0", "c1") + val rows1 = df1.groupBy($"c0").voted_avg("c1").collect + assert(rows1.length === 2) + assert(rows1(0).getDouble(1) ~== 0.15) + assert(rows1(1).getDouble(1) ~== 0.10) - val df3 = Seq((1, 0.2f), (1, 0.8f), (2, 0.3f)).toDF("c0", "c1") - val row3 = df3.groupBy($"c0").weight_voted_avg("c1").collect - assert(row3(0).getDouble(1) ~== 0.50) - assert(row3(1).getDouble(1) ~== 0.30) + val df3 = Seq((1, 0.2), (1, 0.8), (2, 0.3)).toDF("c0", "c1") + val rows3 = df3.groupBy($"c0").weight_voted_avg("c1").collect + assert(rows3.length === 2) + assert(rows3(0).getDouble(1) ~== 0.50) + assert(rows3(1).getDouble(1) ~== 0.30) val df5 = Seq((1, 0.2f, 0.1f), (1, 0.4f, 0.2f), (2, 0.8f, 0.9f)).toDF("c0", "c1", "c2") - val row5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect - assert(row5(0).getFloat(1) ~== 0.266666666) - assert(row5(1).getFloat(1) ~== 0.80) + val rows5 = df5.groupBy($"c0").argmin_kld("c1", "c2").collect + assert(rows5.length === 2) + assert(rows5(0).getFloat(1) ~== 0.266666666) + assert(rows5(1).getFloat(1) ~== 0.80) + + val df6 = Seq((1, "id-0", 0.2), (1, "id-1", 0.4), (1, "id-2", 0.1)).toDF("c0", "c1", "c2") + val rows6 = df6.groupBy($"c0").max_label("c2", "c1").collect + assert(rows6.length === 1) + assert(rows6(0).getString(1) == "id-1") + + val df7 = Seq((1, "id-0", 0.5), (1, "id-1", 0.1), (1, "id-2", 0.2)).toDF("c0", "c1", "c2") + val rows7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect + assert(rows7.length === 1) + assert(rows7(0).getString(0) == "id-0") + + val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1") + val rows8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1") + .select("c1.probability").collect + assert(rows8.length === 2) + assert(rows8(0).getDouble(0) ~== 0.3333333333) + assert(rows8(1).getDouble(0) ~== 1.0) + } + + test("aggregations for evaluation") { + import hiveContext.implicits._ - val df6 = Seq((1, "id-0", 0.2f), (1, "id-1", 0.4f), (1, "id-2", 0.1f)).toDF("c0", "c1", "c2") - val row6 = df6.groupBy($"c0").max_label("c2", "c1").collect - assert(row6(0).getString(1) == "id-1") + val testDf1 = Seq((1, 1.0, 0.5), (1, 0.3, 0.5), (1, 0.1, 0.2)).toDF("c0", "c1", "c2") + val rows1 = testDf1.groupBy($"c0").mae("c1", "c2").collect + assert(rows1.length === 1) + assert(rows1(0).getDouble(1) ~== 0.26666666) + val rows2 = testDf1.groupBy($"c0").mse("c1", "c2").collect + assert(rows2.length === 1) + assert(rows2(0).getDouble(1) ~== 0.1) + val rows3 = testDf1.groupBy($"c0").rmse("c1", "c2").collect + assert(rows3.length === 1) + assert(rows3(0).getDouble(1) ~== 0.31622776601683794) + val rows4 = testDf1.groupBy($"c0").r2("c1", "c2").collect + assert(rows4.length === 1) + assert(rows4(0).getDouble(1) ~== -4.0) + val rows5 = testDf1.groupBy($"c0").logloss("c1", "c2").collect + assert(rows5.length === 1) + assert(rows5(0).getDouble(1) ~== 6.198305767142615) + + val testDf2 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))) + .toDF("c0", "c1", "c2") + val rows6 = testDf2.groupBy($"c0").ndcg("c1", "c2").collect + assert(rows6.length === 1) + assert(rows6(0).getDouble(1) ~== 0.19342640361727081) + val rows7 = testDf2.groupBy($"c0").precision_at("c1", "c2").collect + assert(rows7.length === 1) + assert(rows7(0).getDouble(1) ~== 0.25) + val rows8 = testDf2.groupBy($"c0").recall_at("c1", "c2").collect + assert(rows8.length === 1) + assert(rows8(0).getDouble(1) ~== 0.25) + val rows9 = testDf2.groupBy($"c0").hitrate("c1", "c2").collect + assert(rows9.length === 1) + assert(rows9(0).getDouble(1) ~== 0.50) + val rows10 = testDf2.groupBy($"c0").mrr("c1", "c2").collect + assert(rows10.length === 1) + assert(rows10(0).getDouble(1) ~== 0.25) + val rows11 = testDf2.groupBy($"c0").average_precision("c1", "c2").collect + assert(rows11.length === 1) + assert(rows11(0).getDouble(1) ~== 0.25) + val rows12 = testDf2.groupBy($"c0").auc("c1", "c2").collect + assert(rows12.length === 1) + assert(rows12(0).getDouble(1) ~== 0.25) + } - val df7 = Seq((1, "id-0", 0.5f), (1, "id-1", 0.1f), (1, "id-2", 0.2f)).toDF("c0", "c1", "c2") - val row7 = df7.groupBy($"c0").maxrow("c2", "c1").toDF("c0", "c1").select($"c1.col1").collect - assert(row7(0).getString(0) == "id-0") + test("aggregations for topicmodel") { + import hiveContext.implicits._ - // val df8 = Seq((1, 1), (1, 2), (2, 1), (1, 5)).toDF("c0", "c1") - // val row8 = df8.groupBy($"c0").rf_ensemble("c1").toDF("c0", "c1") - // .select("c1.probability").collect - // assert(row8(0).getDouble(0) ~== 0.3333333333) - // assert(row8(1).getDouble(0) ~== 1.0) + val testDf = Seq((1, "abcd", 0.1, 0, 0.1), (1, "efgh", 0.2, 0, 0.1)) + .toDF("key", "word", "value", "label", "lambda") + val rows1 = testDf.groupBy($"key").lda_predict("word", "value", "label", "lambda").collect + assert(rows1.length === 1) + val result1 = rows1(0).getSeq[Row](1).map { case Row(label: Int, prob: Float) => label -> prob } + .toMap[Int, Float] + assert(result1.size === 10) + assert(result1(0) ~== 0.07692449) + assert(result1(1) ~== 0.07701121) + assert(result1(2) ~== 0.07701129) + assert(result1(3) ~== 0.07705542) + assert(result1(4) ~== 0.07701511) + assert(result1(5) ~== 0.07701234) + assert(result1(6) ~== 0.07701384) + assert(result1(7) ~== 0.30693996) + assert(result1(8) ~== 0.07700701) + assert(result1(9) ~== 0.07700934) + + val rows2 = testDf.groupBy($"key").plsa_predict("word", "value", "label", "lambda").collect + assert(rows2.length === 1) + val result2 = rows2(0).getSeq[Row](1).map { case Row(label: Int, prob: Float) => label -> prob } + .toMap[Int, Float] + assert(result2.size === 10) + assert(result2(0) ~== 0.062156882) + assert(result2(1) ~== 0.05088547) + assert(result2(2) ~== 0.12434204) + assert(result2(3) ~== 0.31869823) + assert(result2(4) ~== 0.01584355) + assert(result2(5) ~== 0.0057667173) + assert(result2(6) ~== 0.10864779) + assert(result2(7) ~== 0.09346106) + assert(result2(8) ~== 0.13905199) + assert(result2(9) ~== 0.081146255) } - test("user-defined aggregators for evaluation") { + test("aggregations for ftvec.text") { import hiveContext.implicits._ + val testDf = Seq((1, "abc def hi jk l"), (1, "def jk")).toDF("key", "text") + val rows = testDf.groupBy($"key").tf("text").collect + assert(rows.length === 1) + val result = rows(0).getAs[Map[String, Float]](1) + assert(result.size === 2) + assert(result("def jk") ~== 0.5f) + assert(result("abc def hi jk l") ~== 0.5f) + } - val df1 = Seq((1, 1.0f, 0.5f), (1, 0.3f, 0.5f), (1, 0.1f, 0.2f)).toDF("c0", "c1", "c2") - val row1 = df1.groupBy($"c0").mae("c1", "c2").collect - assert(row1(0).getDouble(1) ~== 0.26666666) + test("aggregations for tools.array") { + import hiveContext.implicits._ - val df2 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2") - val row2 = df2.groupBy($"c0").mse("c1", "c2").collect - assert(row2(0).getDouble(1) ~== 0.29999999) + val testDf = Seq((1, 1 :: 3 :: Nil), (1, 3 :: 5 :: Nil)).toDF("key", "ar") + val rows1 = testDf.groupBy($"key").array_avg("ar").collect + assert(rows1.length === 1) + val result1 = rows1(0).getSeq[Float](1) + assert(result1.length === 2) + assert(result1(0) ~== 2.0f) + assert(result1(1) ~== 4.0f) + + val rows2 = testDf.groupBy($"key").array_sum("ar").collect + assert(rows2.length === 1) + val result2 = rows2(0).getSeq[Double](1) + assert(result2.length === 2) + assert(result2(0) ~== 4.0) + assert(result2(1) ~== 8.0) + } - val df3 = Seq((1, 0.3f, 0.8f), (1, 1.2f, 2.0f), (1, 0.2f, 0.3f)).toDF("c0", "c1", "c2") - val row3 = df3.groupBy($"c0").rmse("c1", "c2").collect - assert(row3(0).getDouble(1) ~== 0.54772253) + test("aggregations for tools.bits") { + import hiveContext.implicits._ + val testDf = Seq((1, 1), (1, 7)).toDF("key", "x") + val rows = testDf.groupBy($"key").bits_collect("x").collect + assert(rows.length === 1) + val result = rows(0).getSeq[Int](1) + assert(result === Seq(130)) + } - val df4 = Seq((1, Array(1, 2), Array(2, 3)), (1, Array(3, 8), Array(5, 4))).toDF - .toDF("c0", "c1", "c2") - val row4 = df4.groupBy($"c0").f1score("c1", "c2").collect - assert(row4(0).getDouble(1) ~== 0.25) + test("aggregations for tools.list") { + import hiveContext.implicits._ + val testDf = Seq((1, 3), (1, 1), (1, 2)).toDF("key", "x") + val rows = testDf.groupBy($"key").to_ordered_list("x").collect + assert(rows.length === 1) + val result = rows(0).getSeq[Int](1) + assert(result === Seq(1, 2, 3)) + } + + test("aggregations for tools.map") { + import hiveContext.implicits._ + val testDf = Seq((1, 1, "a"), (1, 2, "b"), (1, 3, "c")).toDF("key", "k", "v") + val rows = testDf.groupBy($"key").to_map("k", "v").collect + assert(rows.length === 1) + val result = rows(0).getMap[Int, String](1) + assert(result === Map(1 -> "a", 2 -> "b", 3 -> "c")) + } + + test("aggregations for tools.math") { + import hiveContext.implicits._ + val testDf = Seq( + (1, Seq(1, 2, 3, 4), Seq(5, 6, 7, 8)), + (1, Seq(9, 10, 11, 12), Seq(13, 14, 15, 16)) + ).toDF("key", "mtx1", "mtx2") + val rows = testDf.groupBy($"key").transpose_and_dot("mtx1", "mtx2").collect + assert(rows.length === 1) + val result = rows(0).getSeq[Int](1) + assert(result === Seq( + Seq(122.0, 132.0, 142.0, 152.0), + Seq(140.0, 152.0, 164.0, 176.0), + Seq(158.0, 172.0, 186.0, 200.0), + Seq(176.0, 192.0, 208.0, 224.0)) + ) } - test("user-defined aggregators for ftvec.trans") { + test("aggregations for ftvec.trans") { import hiveContext.implicits._ val df0 = Seq((1, "cat", "mammal", 9), (1, "dog", "mammal", 10), (1, "human", "mammal", 10), @@ -842,7 +1274,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { assert(result012.values.toSet === Set(9, 10, 11, 12, 13)) } - test("user-defined aggregators for ftvec.selection") { + test("aggregations for ftvec.selection") { import hiveContext.implicits._ // see also hivemall.ftvec.selection.SignalNoiseRatioUDAFTest @@ -889,7 +1321,7 @@ final class HivemallOpsWithFeatureSuite extends HivemallFeatureQueryTest { .foreach((actual, expected) => assert(actual ~== expected)) } - test("user-defined aggregators for tools.matrix") { + test("aggregations for tools.matrix") { import hiveContext.implicits._ // | 1 2 3 |T | 5 6 7 | @@ -950,9 +1382,9 @@ final class HivemallOpsWithVectorSuite extends VectorQueryTest { ) } - test("train_logregr") { + test("train_logistic_regr") { checkAnswer( - mllibTrainDf.train_logregr($"features", $"label") + mllibTrainDf.train_logistic_regr($"features", $"label") .groupBy("feature").agg("weight" -> "avg") .select($"feature"), Seq(0, 1, 2, 3, 4, 5, 6).map(v => Row(s"$v")) http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala b/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala index 3ca9bbf..bc656d1 100644 --- a/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala +++ b/spark/spark-2.2/src/test/scala/org/apache/spark/sql/hive/test/HivemallFeatureQueryTest.scala @@ -36,17 +36,6 @@ abstract class HivemallFeatureQueryTest extends QueryTest with SQLTestUtils with import hiveContext.implicits._ - /** - * TODO: spark-2.0 does not support literals for some types (e.g., Seq[_] and Array[_]). - * So, it provides that functionality here. - * This helper function will be removed in future releases. - */ - protected def lit2[T : TypeTag](v: T): Column = { - val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] - val convert = CatalystTypeConverters.createToCatalystConverter(dataType) - Column(Literal(convert(v), dataType)) - } - protected val DummyInputData = Seq((0, 0)).toDF("c0", "c1") protected val IntList2Data =