Repository: incubator-hivemall Updated Branches: refs/heads/master 4909deda5 -> 433001522
Close #36: [Spark] Update gitbook for top_k_join Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/43300152 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/43300152 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/43300152 Branch: refs/heads/master Commit: 43300152262a76982f8ad95195a11f0dc485bce6 Parents: 4909ded Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Thu Feb 2 15:10:19 2017 +0900 Committer: Takeshi Yamamuro <yamam...@apache.org> Committed: Thu Feb 2 15:10:19 2017 +0900 ---------------------------------------------------------------------- docs/gitbook/spark/misc/topk_join.md | 179 +++++++++++++++++++++++++----- 1 file changed, 150 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/43300152/docs/gitbook/spark/misc/topk_join.md ---------------------------------------------------------------------- diff --git a/docs/gitbook/spark/misc/topk_join.md b/docs/gitbook/spark/misc/topk_join.md index af3351d..eb10ab2 100644 --- a/docs/gitbook/spark/misc/topk_join.md +++ b/docs/gitbook/spark/misc/topk_join.md @@ -32,37 +32,71 @@ For example, we have two tables below; - An input table (`leftDf`) -| userId | group | x | y | -|:------:|:-----:|:---:|:---:| -| 1 | b | 0.3 | 0.3 | -| 2 | a | 0.5 | 0.4 | -| 3 | a | 0.1 | 0.8 | -| 4 | c | 0.2 | 0.2 | -| 5 | a | 0.1 | 0.4 | -| 6 | b | 0.8 | 0.3 | +```scala +scala> :paste +val leftDf = Seq( + (1, "b", 0.3, 0.3), + (2, "a", 0.5, 0.4), + (3, "a", 0.1, 0.8), + (4, "c", 0.2, 0.2), + (5, "a", 0.1, 0.4), + (6, "b", 0.8, 0.8) +).toDF("userId", "group", "x", "y") + +scala> leftDf.show ++------+-----+---+---+ +|userId|group| x| y| ++------+-----+---+---+ +| 1| b|0.3|0.3| +| 2| a|0.5|0.4| +| 3| a|0.1|0.8| +| 4| c|0.2|0.2| +| 5| a|0.1|0.4| +| 6| b|0.8|0.8| ++------+-----+---+---+ +``` - A reference table (`rightDf`) -| group | position | x | y | -|:-----:|:--------:|:---:|:---:| -| a | pos-1 | 0.0 | 0.1 | -| a | pos-2 | 0.9 | 0.3 | -| a | pos-3 | 0.3 | 0.2 | -| b | pos-4 | 0.5 | 0.7 | -| b | pos-5 | 0.4 | 0.2 | -| c | pos-6 | 0.8 | 0.7 | -| c | pos-7 | 0.3 | 0.3 | -| c | pos-8 | 0.4 | 0.2 | -| c | pos-9 | 0.3 | 0.8 | +```scala +scala> :paste +val rightDf = Seq( + ("a", "pos1", 0.0, 0.1), + ("a", "pos2", 0.9, 0.3), + ("a", "pos3", 0.3, 0.2), + ("b", "pos4", 0.5, 0.7), + ("b", "pos5", 0.4, 0.2), + ("c", "pos6", 0.8, 0.7), + ("c", "pos7", 0.3, 0.3), + ("c", "pos8", 0.4, 0.2), + ("c", "pos9", 0.3, 0.8) +).toDF("group", "position", "x", "y") + +scala> rightDf.show ++-----+--------+---+---+ +|group|position| x| y| ++-----+--------+---+---+ +| a| pos1|0.0|0.1| +| a| pos2|0.9|0.3| +| a| pos3|0.3|0.2| +| b| pos4|0.5|0.7| +| b| pos5|0.4|0.2| +| c| pos6|0.8|0.7| +| c| pos7|0.3|0.3| +| c| pos8|0.4|0.2| +| c| pos9|0.3|0.8| ++-----+--------+---+---+ +``` In the two tables, the example computes the nearest `position` for `userId` in each `group`. The standard way using DataFrame window functions would be as follows: ```scala +scala> paste: val computeDistanceFunc = sqrt(pow(inputDf("x") - masterDf("x"), lit(2.0)) + pow(inputDf("y") - masterDf("y"), lit(2.0))) -leftDf.join( +val resultDf = leftDf.join( right = rightDf, joinExpr = leftDf("group") === rightDf("group") ) @@ -74,7 +108,10 @@ leftDf.join( You can use `top_k_join` as follows: ```scala -leftDf.top_k_join( +scala> paste: +import org.apache.spark.sql.hive.HivemallOps._ + +val resultDf = leftDf.top_k_join( k = lit(-1), right = rightDf, joinExpr = leftDf("group") === rightDf("group"), @@ -84,12 +121,96 @@ leftDf.top_k_join( The result is as follows: -| rank | score | userId | group | x | y | group | position | x | y | -|:----:|:-----:|:------:|:-----:|:---:|:---:|:-----:|:--------:|:---:|:---:| -| 1 | 0.100 | 4 | c | 0.2 | 0.2 | c | pos9 | 0.3 | 0.8 | -| 1 | 0.100 | 1 | b | 0.3 | 0.3 | b | pos5 | 0.4 | 0.2 | -| 1 | 0.300 | 6 | b | 0.8 | 0.8 | b | pos4 | 0.5 | 0.7 | -| 1 | 0.200 | 2 | a | 0.5 | 0.4 | a | pos3 | 0.3 | 0.2 | -| 1 | 0.100 | 3 | a | 0.1 | 0.8 | a | pos1 | 0.0 | 0.1 | -| 1 | 0.100 | 5 | a | 0.1 | 0.4 | a | pos1 | 0.0 | 0.1 | +```scala +scala> resultDf.show ++----+-------------------+------+-----+---+---+-----+--------+---+---+ +|rank| score|userId|group| x| y|group|position| x| y| ++----+-------------------+------+-----+---+---+-----+--------+---+---+ +| 1|0.09999999999999998| 4| c|0.2|0.2| c| pos9|0.3|0.8| +| 1|0.10000000000000003| 1| b|0.3|0.3| b| pos5|0.4|0.2| +| 1|0.30000000000000004| 6| b|0.8|0.8| b| pos4|0.5|0.7| +| 1| 0.2| 2| a|0.5|0.4| a| pos3|0.3|0.2| +| 1| 0.1| 3| a|0.1|0.8| a| pos1|0.0|0.1| +| 1| 0.1| 5| a|0.1|0.4| a| pos1|0.0|0.1| ++----+-------------------+------+-----+---+---+-----+--------+---+---+ +``` + +`top_k_join` is also useful for Spark Vector users. +If you'd like to filter the records having the smallest squared distances between vectors, you can use `top_k_join` as follows; + +```scala +scala> import org.apache.spark.ml.linalg._ +scala> import org.apache.spark.sql.hive.HivemallOps._ +scala> paste: +val leftDf = Seq( + (1, "a", Vectors.dense(Array(1.0, 0.5, 0.6, 0.2))), + (2, "b", Vectors.dense(Array(0.2, 0.3, 0.4, 0.1))), + (3, "a", Vectors.dense(Array(0.8, 0.4, 0.2, 0.6))), + (4, "a", Vectors.dense(Array(0.2, 0.7, 0.4, 0.8))), + (5, "c", Vectors.dense(Array(0.4, 0.5, 0.6, 0.2))), + (6, "c", Vectors.dense(Array(0.3, 0.9, 1.0, 0.1))) +).toDF("userId", "group", "vector") + +scala> leftDf.show ++------+-----+-----------------+ +|userId|group| vector| ++------+-----+-----------------+ +| 1| a|[1.0,0.5,0.6,0.2]| +| 2| b|[0.2,0.3,0.4,0.1]| +| 3| a|[0.8,0.4,0.2,0.6]| +| 4| a|[0.2,0.7,0.4,0.8]| +| 5| c|[0.4,0.5,0.6,0.2]| +| 6| c|[0.3,0.9,1.0,0.1]| ++------+-----+-----------------+ + +scala> paste: +val rightDf = Seq( + ("a", "pos-1", Vectors.dense(Array(0.3, 0.4, 0.3, 0.5))), + ("a", "pos-2", Vectors.dense(Array(0.9, 0.2, 0.8, 0.3))), + ("a", "pos-3", Vectors.dense(Array(1.0, 0.0, 0.3, 0.1))), + ("a", "pos-4", Vectors.dense(Array(0.1, 0.8, 0.5, 0.7))), + ("b", "pos-5", Vectors.dense(Array(0.3, 0.3, 0.3, 0.8))), + ("b", "pos-6", Vectors.dense(Array(0.0, 0.7, 0.5, 0.6))), + ("b", "pos-7", Vectors.dense(Array(0.1, 0.8, 0.4, 0.5))), + ("c", "pos-8", Vectors.dense(Array(0.8, 0.3, 0.2, 0.1))), + ("c", "pos-9", Vectors.dense(Array(0.7, 0.5, 0.8, 0.3))) + ).toDF("group", "position", "vector") + +scala> rightDf.show ++-----+--------+-----------------+ +|group|position| vector| ++-----+--------+-----------------+ +| a| pos-1|[0.3,0.4,0.3,0.5]| +| a| pos-2|[0.9,0.2,0.8,0.3]| +| a| pos-3|[1.0,0.0,0.3,0.1]| +| a| pos-4|[0.1,0.8,0.5,0.7]| +| b| pos-5|[0.3,0.3,0.3,0.8]| +| b| pos-6|[0.0,0.7,0.5,0.6]| +| b| pos-7|[0.1,0.8,0.4,0.5]| +| c| pos-8|[0.8,0.3,0.2,0.1]| +| c| pos-9|[0.7,0.5,0.8,0.3]| ++-----+--------+-----------------+ + +scala> paste: +val sqDistFunc = udf { (v1: Vector, v2: Vector) => Vectors.sqdist(v1, v2) } + +val resultDf = leftDf.top_k_join( + k = lit(-1), + right = rightDf, + joinExpr = leftDf("group") === rightDf("group"), + score = sqDistFunc(leftDf("vector"), rightDf("vector")).as("score") +) + +scala> resultDf.show ++----+-------------------+------+-----+-----------------+-----+--------+-----------------+ +|rank| score|userId|group| vector|group|position| vector| ++----+-------------------+------+-----+-----------------+-----+--------+-----------------+ +| 1|0.13999999999999996| 5| c|[0.4,0.5,0.6,0.2]| c| pos-9|[0.7,0.5,0.8,0.3]| +| 1|0.39999999999999997| 6| c|[0.3,0.9,1.0,0.1]| c| pos-9|[0.7,0.5,0.8,0.3]| +| 1|0.42000000000000004| 2| b|[0.2,0.3,0.4,0.1]| b| pos-7|[0.1,0.8,0.4,0.5]| +| 1|0.15000000000000002| 1| a|[1.0,0.5,0.6,0.2]| a| pos-2|[0.9,0.2,0.8,0.3]| +| 1| 0.27| 3| a|[0.8,0.4,0.2,0.6]| a| pos-1|[0.3,0.4,0.3,0.5]| +| 1|0.04000000000000003| 4| a|[0.2,0.7,0.4,0.8]| a| pos-4|[0.1,0.8,0.5,0.7]| ++----+-------------------+------+-----+-----------------+-----+--------+-----------------+ +```