Close #122: [HIVEMALL-147][Spark] Support all Hivemall functions of v0.5-rc.1 in Spark Dataframe
Project: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/commit/1680c42c Tree: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/tree/1680c42c Diff: http://git-wip-us.apache.org/repos/asf/incubator-hivemall/diff/1680c42c Branch: refs/heads/master Commit: 1680c42cf762a52183f76613fb02411f8f3a671a Parents: fdf7021 Author: Takeshi Yamamuro <[email protected]> Authored: Mon Oct 16 20:52:56 2017 +0900 Committer: Makoto Yui <[email protected]> Committed: Mon Oct 16 21:04:41 2017 +0900 ---------------------------------------------------------------------- .../main/java/hivemall/evaluation/AUCUDAF.java | 4 +- .../java/hivemall/evaluation/HitRateUDAF.java | 5 +- .../main/java/hivemall/evaluation/MAPUDAF.java | 5 +- .../main/java/hivemall/evaluation/MRRUDAF.java | 5 +- .../main/java/hivemall/evaluation/NDCGUDAF.java | 5 +- .../java/hivemall/evaluation/PrecisionUDAF.java | 5 +- .../java/hivemall/evaluation/RecallUDAF.java | 5 +- .../tools/array/ArrayAvgGenericUDAF.java | 2 - .../hivemall/topicmodel/LDAPredictUDAF.java | 2 +- .../hivemall/topicmodel/PLSAPredictUDAF.java | 2 +- .../spark/sql/hive/HivemallGroupedDataset.scala | 474 +++++++++-- .../org/apache/spark/sql/hive/HivemallOps.scala | 824 +++++++++++++++++-- .../spark/sql/hive/HivemallOpsSuite.scala | 736 +++++++++++++---- .../hive/test/HivemallFeatureQueryTest.scala | 11 - 14 files changed, 1764 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/AUCUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/AUCUDAF.java b/core/src/main/java/hivemall/evaluation/AUCUDAF.java index 508e36a..6dba174 100644 --- a/core/src/main/java/hivemall/evaluation/AUCUDAF.java +++ b/core/src/main/java/hivemall/evaluation/AUCUDAF.java @@ -110,7 +110,7 @@ public final class AUCUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input @@ -439,7 +439,7 @@ public final class AUCUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/HitRateUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/HitRateUDAF.java b/core/src/main/java/hivemall/evaluation/HitRateUDAF.java index 6df6087..6a2d0da 100644 --- a/core/src/main/java/hivemall/evaluation/HitRateUDAF.java +++ b/core/src/main/java/hivemall/evaluation/HitRateUDAF.java @@ -71,9 +71,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns HitRate") public final class HitRateUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private HitRateUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -109,7 +106,7 @@ public final class HitRateUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/MAPUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/MAPUDAF.java b/core/src/main/java/hivemall/evaluation/MAPUDAF.java index 45e64cb..fef1f43 100644 --- a/core/src/main/java/hivemall/evaluation/MAPUDAF.java +++ b/core/src/main/java/hivemall/evaluation/MAPUDAF.java @@ -53,9 +53,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns MAP") public final class MAPUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private MAPUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -91,7 +88,7 @@ public final class MAPUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/MRRUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/MRRUDAF.java b/core/src/main/java/hivemall/evaluation/MRRUDAF.java index 98b8c3d..fcd9d51 100644 --- a/core/src/main/java/hivemall/evaluation/MRRUDAF.java +++ b/core/src/main/java/hivemall/evaluation/MRRUDAF.java @@ -53,9 +53,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns MRR") public final class MRRUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private MRRUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -91,7 +88,7 @@ public final class MRRUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/NDCGUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/NDCGUDAF.java b/core/src/main/java/hivemall/evaluation/NDCGUDAF.java index 4e4fde6..00aa16a 100644 --- a/core/src/main/java/hivemall/evaluation/NDCGUDAF.java +++ b/core/src/main/java/hivemall/evaluation/NDCGUDAF.java @@ -55,9 +55,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns nDCG") public final class NDCGUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private NDCGUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -94,7 +91,7 @@ public final class NDCGUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/PrecisionUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/PrecisionUDAF.java b/core/src/main/java/hivemall/evaluation/PrecisionUDAF.java index de8a876..2e09a71 100644 --- a/core/src/main/java/hivemall/evaluation/PrecisionUDAF.java +++ b/core/src/main/java/hivemall/evaluation/PrecisionUDAF.java @@ -53,9 +53,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns Precision") public final class PrecisionUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private PrecisionUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -91,7 +88,7 @@ public final class PrecisionUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/evaluation/RecallUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/evaluation/RecallUDAF.java b/core/src/main/java/hivemall/evaluation/RecallUDAF.java index 30b1712..d084c94 100644 --- a/core/src/main/java/hivemall/evaluation/RecallUDAF.java +++ b/core/src/main/java/hivemall/evaluation/RecallUDAF.java @@ -53,9 +53,6 @@ import org.apache.hadoop.io.LongWritable; + " - Returns Recall") public final class RecallUDAF extends AbstractGenericUDAFResolver { - // prevent instantiation - private RecallUDAF() {} - @Override public GenericUDAFEvaluator getEvaluator(@Nonnull TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 2 && typeInfo.length != 3) { @@ -91,7 +88,7 @@ public final class RecallUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 2 || parameters.length == 3) : parameters.length; + assert (0 < parameters.length && parameters.length <= 3) : parameters.length; super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/tools/array/ArrayAvgGenericUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/tools/array/ArrayAvgGenericUDAF.java b/core/src/main/java/hivemall/tools/array/ArrayAvgGenericUDAF.java index 6dbb7d5..090a50c 100644 --- a/core/src/main/java/hivemall/tools/array/ArrayAvgGenericUDAF.java +++ b/core/src/main/java/hivemall/tools/array/ArrayAvgGenericUDAF.java @@ -61,8 +61,6 @@ import org.apache.hadoop.io.IntWritable; + " in which each element is the mean of a set of numbers") public final class ArrayAvgGenericUDAF extends AbstractGenericUDAFResolver { - private ArrayAvgGenericUDAF() {}// prevent instantiation - @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfo) throws SemanticException { if (typeInfo.length != 1) { http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/topicmodel/LDAPredictUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/topicmodel/LDAPredictUDAF.java b/core/src/main/java/hivemall/topicmodel/LDAPredictUDAF.java index 94d510a..68c802f 100644 --- a/core/src/main/java/hivemall/topicmodel/LDAPredictUDAF.java +++ b/core/src/main/java/hivemall/topicmodel/LDAPredictUDAF.java @@ -200,7 +200,7 @@ public final class LDAPredictUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 4 || parameters.length == 5); + assert (parameters.length == 1 || parameters.length == 4 || parameters.length == 5); super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/core/src/main/java/hivemall/topicmodel/PLSAPredictUDAF.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/hivemall/topicmodel/PLSAPredictUDAF.java b/core/src/main/java/hivemall/topicmodel/PLSAPredictUDAF.java index 7702945..6210359 100644 --- a/core/src/main/java/hivemall/topicmodel/PLSAPredictUDAF.java +++ b/core/src/main/java/hivemall/topicmodel/PLSAPredictUDAF.java @@ -202,7 +202,7 @@ public final class PLSAPredictUDAF extends AbstractGenericUDAFResolver { @Override public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException { - assert (parameters.length == 4 || parameters.length == 5); + assert (parameters.length == 1 || parameters.length == 4 || parameters.length == 5); super.init(mode, parameters); // initialize input http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala index a012efd..00617b7 100644 --- a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala +++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallGroupedDataset.scala @@ -34,25 +34,61 @@ import org.apache.spark.sql.types._ /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. * + * @groupname classifier * @groupname ensemble - * @groupname ftvec.trans * @groupname evaluation + * @groupname topicmodel + * @groupname ftvec.selection + * @groupname ftvec.text + * @groupname ftvec.trans + * @groupname tools.array + * @groupname tools.bits + * @groupname tools.list + * @groupname tools.map + * @groupname tools.matrix + * @groupname tools.math + * + * A list of unsupported functions is as follows: + * * ftvec.conv + * - conv2dense + * - build_bins */ final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { /** + * @see hivemall.classifier.KPAPredictUDAF + * @group classifier + */ + def kpa_predict(xh: String, xk: String, w0: String, w1: String, w2: String, w3: String) + : DataFrame = { + checkType(xh, DoubleType) + checkType(xk, DoubleType) + checkType(w0, FloatType) + checkType(w1, FloatType) + checkType(w2, FloatType) + checkType(w3, FloatType) + val udaf = HiveUDAFFunction( + "kpa_predict", + new HiveFunctionWrapper("hivemall.classifier.KPAPredictUDAF"), + Seq(xh, xk, w0, w1, w2, w3).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** * @see hivemall.ensemble.bagging.VotedAvgUDAF * @group ensemble */ def voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) + checkType(weight, DoubleType) val udaf = HiveUDAFFunction( "voted_avg", new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), + Seq(weight).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** @@ -60,14 +96,14 @@ final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { * @group ensemble */ def weight_voted_avg(weight: String): DataFrame = { - // checkType(weight, NumericType) + checkType(weight, DoubleType) val udaf = HiveUDAFFunction( "weight_voted_avg", new HiveFunctionWrapper("hivemall.ensemble.bagging.WeightVotedAvgUDAF"), - Seq(weight).map(df.col(_).expr), + Seq(weight).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** @@ -75,15 +111,15 @@ final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { * @group ensemble */ def argmin_kld(weight: String, conv: String): DataFrame = { - // checkType(weight, NumericType) - // checkType(conv, NumericType) + checkType(weight, FloatType) + checkType(conv, FloatType) val udaf = HiveUDAFFunction( "argmin_kld", new HiveFunctionWrapper("hivemall.ensemble.ArgminKLDistanceUDAF"), - Seq(weight, conv).map(df.col(_).expr), + Seq(weight, conv).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** @@ -91,15 +127,15 @@ final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { * @group ensemble */ def max_label(score: String, label: String): DataFrame = { - // checkType(score, NumericType) + checkType(score, DoubleType) checkType(label, StringType) val udaf = HiveUDAFFunction( "max_label", new HiveFunctionWrapper("hivemall.ensemble.MaxValueLabelUDAF"), - Seq(score, label).map(df.col(_).expr), + Seq(score, label).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** @@ -107,134 +143,430 @@ final class HivemallGroupedDataset(groupBy: RelationalGroupedDataset) { * @group ensemble */ def maxrow(score: String, label: String): DataFrame = { - // checkType(score, NumericType) + checkType(score, DoubleType) checkType(label, StringType) val udaf = HiveUDAFFunction( "maxrow", new HiveFunctionWrapper("hivemall.ensemble.MaxRowUDAF"), - Seq(score, label).map(df.col(_).expr), + Seq(score, label).map(df(_).expr), isUDAFBridgeRequired = false) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** * @see hivemall.smile.tools.RandomForestEnsembleUDAF * @group ensemble */ - def rf_ensemble(predict: String): DataFrame = { - // checkType(predict, NumericType) + @scala.annotation.varargs + def rf_ensemble(yhat: String, others: String*): DataFrame = { + checkType(yhat, IntegerType) val udaf = HiveUDAFFunction( "rf_ensemble", new HiveFunctionWrapper("hivemall.smile.tools.RandomForestEnsembleUDAF"), - Seq(predict).map(df.col(_).expr), + (yhat +: others).map(df(_).expr), isUDAFBridgeRequired = false) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** - * @see hivemall.tools.matrix.TransposeAndDotUDAF + * @see hivemall.evaluation.MeanAbsoluteErrorUDAF + * @group evaluation */ - def transpose_and_dot(X: String, Y: String): DataFrame = { + def mae(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) val udaf = HiveUDAFFunction( - "transpose_and_dot", - new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), - Seq(X, Y).map(df.col(_).expr), + "mae", + new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MeanSquareErrorUDAF + * @group evaluation + */ + def mse(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "mse", + new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.RootMeanSquareErrorUDAF + * @group evaluation + */ + def rmse(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "rmse", + new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.R2UDAF + * @group evaluation + */ + def r2(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "r2", + new HiveFunctionWrapper("hivemall.evaluation.R2UDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.LogarithmicLossUDAF + * @group evaluation + */ + def logloss(predict: String, target: String): DataFrame = { + checkType(predict, DoubleType) + checkType(target, DoubleType) + val udaf = HiveUDAFFunction( + "logloss", + new HiveFunctionWrapper("hivemall.evaluation.LogarithmicLossUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.F1ScoreUDAF + * @group evaluation + */ + def f1score(predict: String, target: String): DataFrame = { + // checkType(target, ArrayType(IntegerType, false)) + // checkType(predict, ArrayType(IntegerType, false)) + val udaf = HiveUDAFFunction( + "f1score", + new HiveFunctionWrapper("hivemall.evaluation.F1ScoreUDAF"), + Seq(predict, target).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.NDCGUDAF + * @group evaluation + */ + @scala.annotation.varargs + def ndcg(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "ndcg", + new HiveFunctionWrapper("hivemall.evaluation.NDCGUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.PrecisionUDAF + * @group evaluation + */ + @scala.annotation.varargs + def precision_at(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "precision_at", + new HiveFunctionWrapper("hivemall.evaluation.PrecisionUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.RecallUDAF + * @group evaluation + */ + @scala.annotation.varargs + def recall_at(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "recall_at", + new HiveFunctionWrapper("hivemall.evaluation.RecallUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.HitRateUDAF + * @group evaluation + */ + @scala.annotation.varargs + def hitrate(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "hitrate", + new HiveFunctionWrapper("hivemall.evaluation.HitRateUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MRRUDAF + * @group evaluation + */ + @scala.annotation.varargs + def mrr(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "mrr", + new HiveFunctionWrapper("hivemall.evaluation.MRRUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.MAPUDAF + * @group evaluation + */ + @scala.annotation.varargs + def average_precision(rankItems: String, correctItems: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "average_precision", + new HiveFunctionWrapper("hivemall.evaluation.MAPUDAF"), + (rankItems +: correctItems +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.evaluation.AUCUDAF + * @group evaluation + */ + @scala.annotation.varargs + def auc(args: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "auc", + new HiveFunctionWrapper("hivemall.evaluation.AUCUDAF"), + args.map(df(_).expr), isUDAFBridgeRequired = false) .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.topicmodel.LDAPredictUDAF + * @group topicmodel + */ + @scala.annotation.varargs + def lda_predict(word: String, value: String, label: String, lambda: String, others: String*) + : DataFrame = { + checkType(word, StringType) + checkType(value, DoubleType) + checkType(label, IntegerType) + checkType(lambda, DoubleType) + val udaf = HiveUDAFFunction( + "lda_predict", + new HiveFunctionWrapper("hivemall.topicmodel.LDAPredictUDAF"), + (word +: value +: label +: lambda +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.topicmodel.PLSAPredictUDAF + * @group topicmodel + */ + @scala.annotation.varargs + def plsa_predict(word: String, value: String, label: String, prob: String, others: String*) + : DataFrame = { + checkType(word, StringType) + checkType(value, DoubleType) + checkType(label, IntegerType) + checkType(prob, DoubleType) + val udaf = HiveUDAFFunction( + "plsa_predict", + new HiveFunctionWrapper("hivemall.topicmodel.PLSAPredictUDAF"), + (word +: value +: label +: prob +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.ftvec.text.TermFrequencyUDAF + * @group ftvec.text + */ + def tf(text: String): DataFrame = { + checkType(text, StringType) + val udaf = HiveUDAFFunction( + "tf", + new HiveFunctionWrapper("hivemall.ftvec.text.TermFrequencyUDAF"), + Seq(text).map(df(_).expr), + isUDAFBridgeRequired = true) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** * @see hivemall.ftvec.trans.OnehotEncodingUDAF * @group ftvec.trans */ - def onehot_encoding(cols: String*): DataFrame = { + @scala.annotation.varargs + def onehot_encoding(feature: String, others: String*): DataFrame = { val udaf = HiveUDAFFunction( "onehot_encoding", new HiveFunctionWrapper("hivemall.ftvec.trans.OnehotEncodingUDAF"), - cols.map(df.col(_).expr), + (feature +: others).map(df(_).expr), isUDAFBridgeRequired = false) .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** * @see hivemall.ftvec.selection.SignalNoiseRatioUDAF + * @group ftvec.selection */ - def snr(X: String, Y: String): DataFrame = { + def snr(feature: String, label: String): DataFrame = { val udaf = HiveUDAFFunction( "snr", new HiveFunctionWrapper("hivemall.ftvec.selection.SignalNoiseRatioUDAF"), - Seq(X, Y).map(df.col(_).expr), + Seq(feature, label).map(df(_).expr), isUDAFBridgeRequired = false) .toAggregateExpression() - toDF(Seq(Alias(udaf, udaf.prettyName)())) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** - * @see hivemall.evaluation.MeanAbsoluteErrorUDAF - * @group evaluation + * @see hivemall.tools.array.ArrayAvgGenericUDAF + * @group tools.array */ - def mae(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) + def array_avg(ar: String): DataFrame = { val udaf = HiveUDAFFunction( - "mae", - new HiveFunctionWrapper("hivemall.evaluation.MeanAbsoluteErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) + "array_avg", + new HiveFunctionWrapper("hivemall.tools.array.ArrayAvgGenericUDAF"), + Seq(ar).map(df(_).expr), + isUDAFBridgeRequired = false) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** - * @see hivemall.evaluation.MeanSquareErrorUDAF - * @group evaluation + * @see hivemall.tools.array.ArraySumUDAF + * @group tools.array */ - def mse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) + def array_sum(ar: String): DataFrame = { val udaf = HiveUDAFFunction( - "mse", - new HiveFunctionWrapper("hivemall.evaluation.MeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), + "array_sum", + new HiveFunctionWrapper("hivemall.tools.array.ArraySumUDAF"), + Seq(ar).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** - * @see hivemall.evaluation.RootMeanSquareErrorUDAF - * @group evaluation + * @see hivemall.tools.bits.BitsCollectUDAF + * @group tools.bits */ - def rmse(predict: String, target: String): DataFrame = { - checkType(predict, FloatType) - checkType(target, FloatType) + def bits_collect(x: String): DataFrame = { val udaf = HiveUDAFFunction( - "rmse", - new HiveFunctionWrapper("hivemall.evaluation.RootMeanSquaredErrorUDAF"), - Seq(predict, target).map(df.col(_).expr), - isUDAFBridgeRequired = true) + "bits_collect", + new HiveFunctionWrapper("hivemall.tools.bits.BitsCollectUDAF"), + Seq(x).map(df(_).expr), + isUDAFBridgeRequired = false) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** - * @see hivemall.evaluation.F1ScoreUDAF - * @group evaluation + * @see hivemall.tools.list.UDAFToOrderedList + * @group tools.list */ - def f1score(predict: String, target: String): DataFrame = { - // checkType(target, ArrayType(IntegerType)) - // checkType(predict, ArrayType(IntegerType)) + @scala.annotation.varargs + def to_ordered_list(value: String, others: String*): DataFrame = { val udaf = HiveUDAFFunction( - "f1score", - new HiveFunctionWrapper("hivemall.evaluation.F1ScoreUDAF"), - Seq(predict, target).map(df.col(_).expr), + "to_ordered_list", + new HiveFunctionWrapper("hivemall.tools.list.UDAFToOrderedList"), + (value +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.map.UDAFToMap + * @group tools.map + */ + def to_map(key: String, value: String): DataFrame = { + val udaf = HiveUDAFFunction( + "to_map", + new HiveFunctionWrapper("hivemall.tools.map.UDAFToMap"), + Seq(key, value).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.map.UDAFToOrderedMap + * @group tools.map + */ + @scala.annotation.varargs + def to_ordered_map(key: String, value: String, others: String*): DataFrame = { + val udaf = HiveUDAFFunction( + "to_ordered_map", + new HiveFunctionWrapper("hivemall.tools.map.UDAFToOrderedMap"), + (key +: value +: others).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.matrix.TransposeAndDotUDAF + * @group tools.matrix + */ + def transpose_and_dot(matrix0_row: String, matrix1_row: String): DataFrame = { + val udaf = HiveUDAFFunction( + "transpose_and_dot", + new HiveFunctionWrapper("hivemall.tools.matrix.TransposeAndDotUDAF"), + Seq(matrix0_row, matrix1_row).map(df(_).expr), + isUDAFBridgeRequired = false) + .toAggregateExpression() + toDF(Alias(udaf, udaf.prettyName)() :: Nil) + } + + /** + * @see hivemall.tools.math.L2NormUDAF + * @group tools.math + */ + def l2_norm(xi: String): DataFrame = { + val udaf = HiveUDAFFunction( + "l2_norm", + new HiveFunctionWrapper("hivemall.tools.math.L2NormUDAF"), + Seq(xi).map(df(_).expr), isUDAFBridgeRequired = true) .toAggregateExpression() - toDF((Alias(udaf, udaf.prettyName)() :: Nil).toSeq) + toDF(Alias(udaf, udaf.prettyName)() :: Nil) } /** http://git-wip-us.apache.org/repos/asf/incubator-hivemall/blob/1680c42c/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala ---------------------------------------------------------------------- diff --git a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala index 22d3153..45f7b4d 100644 --- a/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala +++ b/spark/spark-2.2/src/main/scala/org/apache/spark/sql/hive/HivemallOps.scala @@ -38,12 +38,18 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String + /** - * Hivemall wrapper and some utility functions for DataFrame. + * Hivemall wrapper and some utility functions for DataFrame. These functions below derives + * from `resources/ddl/define-all-as-permanent.hive`. * * @groupname regression * @groupname classifier * @groupname classifier.multiclass + * @groupname recommend + * @groupname topicmodel + * @groupname geospatial + * @groupname smile * @groupname xgboost * @groupname anomaly * @groupname knn.similarity @@ -52,28 +58,72 @@ import org.apache.spark.unsafe.types.UTF8String * @groupname ftvec * @groupname ftvec.amplify * @groupname ftvec.hashing + * @groupname ftvec.paring * @groupname ftvec.scaling + * @groupname ftvec.selection * @groupname ftvec.conv * @groupname ftvec.trans + * @groupname ftvec.ranking + * @groupname tools + * @groupname tools.array + * @groupname tools.bits + * @groupname tools.compress + * @groupname tools.map + * @groupname tools.text * @groupname misc + * + * A list of unsupported functions is as follows: + * * smile + * - guess_attribute_types + * * mapred functions + * - taskid + * - jobid + * - rownum + * - distcache_gets + * - jobconf_gets + * * matrix factorization + * - mf_predict + * - train_mf_sgd + * - train_mf_adagrad + * - train_bprmf + * - bprmf_predict + * * Factorization Machine + * - fm_predict + * - train_fm + * - train_ffm + * - ffm_predict */ final class HivemallOps(df: DataFrame) extends Logging { import internal.HivemallOpsImpl._ - private[this] lazy val _sparkSession = df.sparkSession - private[this] lazy val _analyzer = _sparkSession.sessionState.analyzer - private[this] lazy val _strategy = new UserProvidedPlanner(_sparkSession.sqlContext.conf) + private lazy val _sparkSession = df.sparkSession + private lazy val _strategy = new UserProvidedPlanner(_sparkSession.sqlContext.conf) + + /** + * @see [[hivemall.regression.GeneralRegressorUDTF]] + * @group regression + */ + @scala.annotation.varargs + def train_regressor(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.regression.GeneralRegressorUDTF", + "train_regressor", + setMixServs(toHivemallFeatures(exprs)), + Seq("feature", "weight") + ) + } /** * @see [[hivemall.regression.AdaDeltaUDTF]] * @group regression */ @scala.annotation.varargs - def train_adadelta(exprs: Column*): DataFrame = withTypedPlan { + def train_adadelta_regr(exprs: Column*): DataFrame = withTypedPlan { planHiveGenericUDTF( df, "hivemall.regression.AdaDeltaUDTF", - "train_adadelta", + "train_adadelta_regr", setMixServs(toHivemallFeatures(exprs)), Seq("feature", "weight") ) @@ -84,11 +134,11 @@ final class HivemallOps(df: DataFrame) extends Logging { * @group regression */ @scala.annotation.varargs - def train_adagrad(exprs: Column*): DataFrame = withTypedPlan { + def train_adagrad_regr(exprs: Column*): DataFrame = withTypedPlan { planHiveGenericUDTF( df, "hivemall.regression.AdaGradUDTF", - "train_adagrad", + "train_adagrad_regr", setMixServs(toHivemallFeatures(exprs)), Seq("feature", "weight") ) @@ -144,11 +194,11 @@ final class HivemallOps(df: DataFrame) extends Logging { * @group regression */ @scala.annotation.varargs - def train_logregr(exprs: Column*): DataFrame = withTypedPlan { + def train_logistic_regr(exprs: Column*): DataFrame = withTypedPlan { planHiveGenericUDTF( df, "hivemall.regression.LogressUDTF", - "train_logregr", + "train_logistic_regr", setMixServs(toHivemallFeatures(exprs)), Seq("feature", "weight") ) @@ -215,17 +265,17 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** - * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]] - * @group regression + * @see [[hivemall.classifier.GeneralClassifierUDTF]] + * @group classifier */ @scala.annotation.varargs - def train_randomforest_regr(exprs: Column*): DataFrame = withTypedPlan { + def train_classifier(exprs: Column*): DataFrame = withTypedPlan { planHiveGenericUDTF( df, - "hivemall.smile.regression.RandomForestRegressionUDTF", - "train_randomforest_regr", + "hivemall.classifier.GeneralClassifierUDTF", + "train_classifier", setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + Seq("feature", "weight") ) } @@ -380,17 +430,17 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** - * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]] + * @see [[hivemall.classifier.KernelExpansionPassiveAggressiveUDTF]] * @group classifier */ @scala.annotation.varargs - def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan { + def train_kpa(exprs: Column*): DataFrame = withTypedPlan { planHiveGenericUDTF( df, - "hivemall.smile.classification.RandomForestClassifierUDTF", - "train_randomforest_classifier", + "hivemall.classifier.KernelExpansionPassiveAggressiveUDTF", + "train_kpa", setMixServs(toHivemallFeatures(exprs)), - Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + Seq("h", "hk", "w0", "w1", "w2", "w3") ) } @@ -485,6 +535,21 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** + * @see [[hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF.AROWh]] + * @group classifier.multiclass + */ + @scala.annotation.varargs + def train_multiclass_arowh(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.classifier.multiclass.MulticlassAROWClassifierUDTF$AROWh", + "train_multiclass_arowh", + setMixServs(toHivemallFeatures(exprs)), + Seq("label", "feature", "weight", "conv") + ) + } + + /** * @see [[hivemall.classifier.multiclass.MulticlassSoftConfidenceWeightedUDTF.SCW1]] * @group classifier.multiclass */ @@ -515,6 +580,81 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** + * @see [[hivemall.recommend.SlimUDTF]] + * @group recommend + */ + @scala.annotation.varargs + def train_slim(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.recommend.SlimUDTF", + "train_slim", + setMixServs(toHivemallFeatures(exprs)), + Seq("j", "nn", "w") + ) + } + + /** + * @see [[hivemall.topicmodel.LDAUDTF]] + * @group topicmodel + */ + @scala.annotation.varargs + def train_lda(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.topicmodel.LDAUDTF", + "train_lda", + setMixServs(toHivemallFeatures(exprs)), + Seq("topic", "word", "score") + ) + } + + /** + * @see [[hivemall.topicmodel.PLSAUDTF]] + * @group topicmodel + */ + @scala.annotation.varargs + def train_plsa(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.topicmodel.PLSAUDTF", + "train_plsa", + setMixServs(toHivemallFeatures(exprs)), + Seq("topic", "word", "score") + ) + } + + /** + * @see [[hivemall.smile.regression.RandomForestRegressionUDTF]] + * @group smile + */ + @scala.annotation.varargs + def train_randomforest_regressor(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.smile.regression.RandomForestRegressionUDTF", + "train_randomforest_regressor", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + ) + } + + /** + * @see [[hivemall.smile.classification.RandomForestClassifierUDTF]] + * @group smile + */ + @scala.annotation.varargs + def train_randomforest_classifier(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.smile.classification.RandomForestClassifierUDTF", + "train_randomforest_classifier", + setMixServs(toHivemallFeatures(exprs)), + Seq("model_id", "model_type", "pred_model", "var_importance", "oob_errors", "oob_tests") + ) + } + + /** * :: Experimental :: * @see [[hivemall.xgboost.regression.XGBoostRegressionUDTF]] * @group xgboost @@ -600,6 +740,21 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** + * @see [[hivemall.knn.similarity.DIMSUMMapperUDTF]] + * @group knn.similarity + */ + @scala.annotation.varargs + def dimsum_mapper(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.knn.similarity.DIMSUMMapperUDTF", + "dimsum_mapper", + exprs, + Seq("j", "k", "b_jk") + ) + } + + /** * @see [[hivemall.knn.lsh.MinHashUDTF]] * @group knn.lsh */ @@ -609,7 +764,7 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.knn.lsh.MinHashUDTF", "minhash", - setMixServs(toHivemallFeatures(exprs)), + exprs, Seq("clusterid", "item") ) } @@ -624,7 +779,7 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.ftvec.amplify.AmplifierUDTF", "amplify", - setMixServs(toHivemallFeatures(exprs)), + exprs, Seq("clusterid", "item") ) } @@ -668,7 +823,7 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.ftvec.conv.QuantifyColumnsUDTF", "quantify", - setMixServs(toHivemallFeatures(exprs)), + exprs, (0 until exprs.size - 1).map(i => s"c$i") ) } @@ -683,7 +838,7 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.ftvec.trans.BinarizeLabelUDTF", "binarize_label", - setMixServs(toHivemallFeatures(exprs)), + exprs, (0 until exprs.size - 1).map(i => s"c$i") ) } @@ -698,17 +853,62 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.ftvec.trans.QuantifiedFeaturesUDTF", "quantified_features", - setMixServs(toHivemallFeatures(exprs)), + exprs, Seq("features") ) } /** + * @see [[hivemall.ftvec.ranking.BprSamplingUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def bpr_sampling(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.BprSamplingUDTF", + "bpr_sampling", + exprs, + Seq("user", "pos_item", "neg_item") + ) + } + + /** + * @see [[hivemall.ftvec.ranking.ItemPairsSamplingUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def item_pairs_sampling(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.ItemPairsSamplingUDTF", + "item_pairs_sampling", + exprs, + Seq("pos_item_id", "neg_item_id") + ) + } + + /** + * @see [[hivemall.ftvec.ranking.PopulateNotInUDTF]] + * @group ftvec.ranking + */ + @scala.annotation.varargs + def populate_not_in(exprs: Column*): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.ftvec.ranking.PopulateNotInUDTF", + "populate_not_in", + exprs, + Seq("item") + ) + } + + /** * Splits Seq[String] into pieces. * @group ftvec */ - def explode_array(expr: Column): DataFrame = { - df.explode(expr) { case Row(v: Seq[_]) => + def explode_array(features: Column): DataFrame = { + df.explode(features) { case Row(v: Seq[_]) => // Type erasure removes the component type in Seq v.map(s => HivemallFeature(s.asInstanceOf[String])) } @@ -718,7 +918,7 @@ final class HivemallOps(df: DataFrame) extends Logging { * Splits [[Vector]] into pieces. * @group ftvec */ - def explode_vector(expr: Column): DataFrame = { + def explode_vector(features: Column): DataFrame = { val elementSchema = StructType( StructField("feature", StringType) :: StructField("weight", DoubleType) :: Nil) val explodeFunc: Row => TraversableOnce[InternalRow] = (row: Row) => { @@ -737,7 +937,7 @@ final class HivemallOps(df: DataFrame) extends Logging { } withTypedPlan { Generate( - UserDefinedGenerator(elementSchema, explodeFunc, expr.expr :: Nil), + UserDefinedGenerator(elementSchema, explodeFunc, features.expr :: Nil), join = true, outer = false, None, generatorOutput = Nil, df.logicalPlan) @@ -745,6 +945,20 @@ final class HivemallOps(df: DataFrame) extends Logging { } /** + * @see [[hivemall.tools.GenerateSeriesUDTF]] + * @group tools + */ + def generate_series(start: Column, end: Column): DataFrame = withTypedPlan { + planHiveGenericUDTF( + df, + "hivemall.tools.GenerateSeriesUDTF", + "generate_series", + start :: end :: Nil, + Seq("generate_series") + ) + } + + /** * Returns `top-k` records for each `group`. * @group misc */ @@ -877,7 +1091,7 @@ final class HivemallOps(df: DataFrame) extends Logging { df, "hivemall.dataset.LogisticRegressionDataGeneratorUDTFWrapper", "lr_datagen", - setMixServs(toHivemallFeatures(exprs)), + exprs, Seq("label", "features") ) } @@ -900,7 +1114,7 @@ final class HivemallOps(df: DataFrame) extends Logging { * in all possible spark workers. */ @Experimental - private[this] def setMixServs(exprs: Seq[Column]): Seq[Column] = { + private def setMixServs(exprs: Seq[Column]): Seq[Column] = { val mixes = System.getenv("HIVEMALL_MIX_SERVERS") if (mixes != null && !mixes.isEmpty()) { val groupId = df.sqlContext.sparkContext.applicationId + "-" + UUID.randomUUID @@ -919,7 +1133,7 @@ final class HivemallOps(df: DataFrame) extends Logging { /** * If the input is a [[Vector]], transform it into Hivemall features. */ - @inline private[this] def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = { + @inline private def toHivemallFeatures(exprs: Seq[Column]): Seq[Column] = { df.select(exprs: _*).queryExecution.analyzed.schema.zip(exprs).map { case (StructField(_, _: VectorUDT, _, _), c) => HivemallUtils.to_hivemall_features(c) case (_, c) => c @@ -929,13 +1143,13 @@ final class HivemallOps(df: DataFrame) extends Logging { /** * A convenient function to wrap a logical plan and produce a DataFrame. */ - @inline private[this] def withTypedPlan(logicalPlan: => LogicalPlan): DataFrame = { + @inline private def withTypedPlan(logicalPlan: => LogicalPlan): DataFrame = { val queryExecution = _sparkSession.sessionState.executePlan(logicalPlan) val outputSchema = queryExecution.sparkPlan.schema new Dataset[Row](df.sparkSession, queryExecution, RowEncoder(outputSchema)) } - @inline private[this] def withTypedPlanInCustomStrategy(logicalPlan: => LogicalPlan) + @inline private def withTypedPlanInCustomStrategy(logicalPlan: => LogicalPlan) : DataFrame = { // Inject custom strategies if (!_sparkSession.experimental.extraStrategies.contains(_strategy)) { @@ -967,6 +1181,118 @@ object HivemallOps { } /** + * @see [[hivemall.geospatial.TileUDF]] + * @group geospatial + */ + def tile(lat: Column, lon: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileUDF", + "tile", + lat :: lon :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.MapURLUDF]] + * @group geospatial + */ + @scala.annotation.varargs + def map_url(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.MapURLUDF", + "map_url", + exprs + ) + } + + /** + * @see [[hivemall.geospatial.Lat2TileYUDF]] + * @group geospatial + */ + def lat2tiley(lat: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.Lat2TileYUDF", + "lat2tiley", + lat :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.Lon2TileXUDF]] + * @group geospatial + */ + def lon2tilex(lon: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.Lon2TileXUDF", + "lon2tilex", + lon :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.TileX2LonUDF]] + * @group geospatial + */ + def tilex2lon(x: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileX2LonUDF", + "tilex2lon", + x :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.TileY2LatUDF]] + * @group geospatial + */ + def tiley2lat(y: Column, zoom: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.TileY2LatUDF", + "tiley2lat", + y :: zoom :: Nil + ) + } + + /** + * @see [[hivemall.geospatial.HaversineDistanceUDF]] + * @group geospatial + */ + @scala.annotation.varargs + def haversine_distance(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.geospatial.HaversineDistanceUDF", + "haversine_distance", + exprs + ) + } + + /** + * @see [[hivemall.smile.tools.TreePredictUDF]] + * @group smile + */ + @scala.annotation.varargs + def tree_predict(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.smile.tools.TreePredictUDF", + "tree_predict", + exprs + ) + } + + /** + * @see [[hivemall.smile.tools.TreeExportUDF]] + * @group smile + */ + @scala.annotation.varargs + def tree_export(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.smile.tools.TreeExportUDF", + "tree_export", + exprs + ) + } + + /** * @see [[hivemall.anomaly.ChangeFinderUDF]] * @group anomaly */ @@ -997,10 +1323,10 @@ object HivemallOps { * @group knn.similarity */ @scala.annotation.varargs - def cosine_sim(exprs: Column*): Column = withExpr { + def cosine_similarity(exprs: Column*): Column = withExpr { planHiveGenericUDF( "hivemall.knn.similarity.CosineSimilarityUDF", - "cosine_sim", + "cosine_similarity", exprs ) } @@ -1010,10 +1336,10 @@ object HivemallOps { * @group knn.similarity */ @scala.annotation.varargs - def jaccard(exprs: Column*): Column = withExpr { + def jaccard_similarity(exprs: Column*): Column = withExpr { planHiveUDF( "hivemall.knn.similarity.JaccardIndexUDF", - "jaccard", + "jaccard_similarity", exprs ) } @@ -1137,6 +1463,19 @@ object HivemallOps { } /** + * @see [[hivemall.knn.distance.JaccardDistanceUDF]] + * @group knn.distance + */ + @scala.annotation.varargs + def jaccard_distance(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.knn.distance.JaccardDistanceUDF", + "jaccard_distance", + exprs + ) + } + + /** * @see [[hivemall.knn.distance.ManhattanDistanceUDF]] * @group knn.distance */ @@ -1154,7 +1493,7 @@ object HivemallOps { * @group knn.distance */ @scala.annotation.varargs - def minkowski_distance (exprs: Column*): Column = withExpr { + def minkowski_distance(exprs: Column*): Column = withExpr { planHiveGenericUDF( "hivemall.knn.distance.MinkowskiDistanceUDF", "minkowski_distance", @@ -1237,11 +1576,11 @@ object HivemallOps { * @see [[hivemall.ftvec.AddFeatureIndexUDFWrapper]] * @group ftvec */ - def add_feature_index(expr: Column): Column = withExpr { + def add_feature_index(features: Column): Column = withExpr { planHiveGenericUDF( "hivemall.ftvec.AddFeatureIndexUDFWrapper", "add_feature_index", - expr :: Nil + features :: Nil ) } @@ -1273,11 +1612,12 @@ object HivemallOps { * @see [[hivemall.ftvec.hashing.Sha1UDF]] * @group ftvec.hashing */ - def sha1(expr: Column): Column = withExpr { + @scala.annotation.varargs + def sha1(exprs: Column*): Column = withExpr { planHiveUDF( "hivemall.ftvec.hashing.Sha1UDF", "sha1", - expr :: Nil + exprs ) } @@ -1287,7 +1627,6 @@ object HivemallOps { */ @scala.annotation.varargs def array_hash_values(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types planHiveUDF( "hivemall.ftvec.hashing.ArrayHashValuesUDF", "array_hash_values", @@ -1301,7 +1640,6 @@ object HivemallOps { */ @scala.annotation.varargs def prefixed_hash_values(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types planHiveUDF( "hivemall.ftvec.hashing.ArrayPrefixedHashValuesUDF", "prefixed_hash_values", @@ -1310,6 +1648,45 @@ object HivemallOps { } /** + * @see [[hivemall.ftvec.hashing.FeatureHashingUDF]] + * @group ftvec.hashing + */ + @scala.annotation.varargs + def feature_hashing(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.hashing.FeatureHashingUDF", + "feature_hashing", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.pairing.PolynomialFeaturesUDF]] + * @group ftvec.paring + */ + @scala.annotation.varargs + def polynomial_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.pairing.PolynomialFeaturesUDF", + "polynomial_features", + exprs + ) + } + + /** + * @see [[hivemall.ftvec.pairing.PoweredFeaturesUDF]] + * @group ftvec.paring + */ + @scala.annotation.varargs + def powered_features(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.ftvec.pairing.PoweredFeaturesUDF", + "powered_features", + exprs + ) + } + + /** * @see [[hivemall.ftvec.scaling.RescaleUDF]] * @group ftvec.scaling */ @@ -1338,7 +1715,7 @@ object HivemallOps { * @see [[hivemall.ftvec.scaling.L2NormalizationUDFWrapper]] * @group ftvec.scaling */ - def normalize(expr: Column): Column = withExpr { + def l2_normalize(expr: Column): Column = withExpr { planHiveGenericUDF( "hivemall.ftvec.scaling.L2NormalizationUDFWrapper", "normalize", @@ -1364,8 +1741,7 @@ object HivemallOps { */ @scala.annotation.varargs def to_dense_features(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveGenericUDF( + planHiveUDF( "hivemall.ftvec.conv.ToDenseFeaturesUDF", "to_dense_features", exprs @@ -1378,8 +1754,7 @@ object HivemallOps { */ @scala.annotation.varargs def to_sparse_features(exprs: Column*): Column = withExpr { - // TODO: Need a wrapper class because of using unsupported types - planHiveGenericUDF( + planHiveUDF( "hivemall.ftvec.conv.ToSparseFeaturesUDF", "to_sparse_features", exprs @@ -1387,6 +1762,19 @@ object HivemallOps { } /** + * @see [[hivemall.ftvec.binning.FeatureBinningUDF]] + * @group ftvec.conv + */ + @scala.annotation.varargs + def feature_binning(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.binning.FeatureBinningUDF", + "feature_binning", + exprs + ) + } + + /** * @see [[hivemall.ftvec.trans.VectorizeFeaturesUDF]] * @group ftvec.trans */ @@ -1413,6 +1801,19 @@ object HivemallOps { } /** + * @see [[hivemall.ftvec.trans.FFMFeaturesUDF]] + * @group ftvec.trans + */ + @scala.annotation.varargs + def ffm_features(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.FFMFeaturesUDF", + "ffm_features", + exprs + ) + } + + /** * @see [[hivemall.ftvec.trans.IndexedFeatures]] * @group ftvec.trans */ @@ -1439,15 +1840,136 @@ object HivemallOps { } /** - * @see [[hivemall.smile.tools.TreePredictUDF]] - * @group misc + * @see [[hivemall.ftvec.trans.AddFieldIndicesUDF]] + * @group ftvec.trans + */ + def add_field_indicies(features: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.ftvec.trans.AddFieldIndicesUDF", + "add_field_indicies", + features :: Nil + ) + } + + /** + * @see [[hivemall.tools.ConvertLabelUDF]] + * @group tools + */ + def convert_label(label: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.ConvertLabelUDF", + "convert_label", + label :: Nil + ) + } + + /** + * @see [[hivemall.tools.RankSequenceUDF]] + * @group tools + */ + def x_rank(key: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.RankSequenceUDF", + "x_rank", + key :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.AllocFloatArrayUDF]] + * @group tools.array + */ + def float_array(nDims: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.AllocFloatArrayUDF", + "float_array", + nDims :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayRemoveUDF]] + * @group tools.array + */ + def array_remove(original: Column, target: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.ArrayRemoveUDF", + "array_remove", + original :: target :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.SortAndUniqArrayUDF]] + * @group tools.array + */ + def sort_and_uniq_array(ar: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SortAndUniqArrayUDF", + "sort_and_uniq_array", + ar :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.SubarrayEndWithUDF]] + * @group tools.array + */ + def subarray_endwith(original: Column, key: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SubarrayEndWithUDF", + "subarray_endwith", + original :: key :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayConcatUDF]] + * @group tools.array */ @scala.annotation.varargs - def tree_predict(exprs: Column*): Column = withExpr { + def array_concat(arrays: Column*): Column = withExpr { planHiveGenericUDF( - "hivemall.smile.tools.TreePredictUDF", - "tree_predict", - exprs + "hivemall.tools.array.ArrayConcatUDF", + "array_concat", + arrays + ) + } + + /** + * @see [[hivemall.tools.array.SubarrayUDF]] + * @group tools.array + */ + def subarray(original: Column, fromIndex: Column, toIndex: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.SubarrayUDF", + "subarray", + original :: fromIndex :: toIndex :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ToStringArrayUDF]] + * @group tools.array + */ + def to_string_array(ar: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.array.ToStringArrayUDF", + "to_string_array", + ar :: Nil + ) + } + + /** + * @see [[hivemall.tools.array.ArrayIntersectUDF]] + * @group tools.array + */ + @scala.annotation.varargs + def array_intersect(arrays: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.array.ArrayIntersectUDF", + "array_intersect", + arrays ) } @@ -1464,6 +1986,194 @@ object HivemallOps { } /** + * @see [[hivemall.tools.bits.ToBitsUDF]] + * @group tools.bits + */ + def to_bits(indexes: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.ToBitsUDF", + "to_bits", + indexes :: Nil + ) + } + + /** + * @see [[hivemall.tools.bits.UnBitsUDF]] + * @group tools.bits + */ + def unbits(bitset: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.UnBitsUDF", + "unbits", + bitset :: Nil + ) + } + + /** + * @see [[hivemall.tools.bits.BitsORUDF]] + * @group tools.bits + */ + @scala.annotation.varargs + def bits_or(bits: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.bits.BitsORUDF", + "bits_or", + bits + ) + } + + /** + * @see [[hivemall.tools.compress.InflateUDF]] + * @group tools.compress + */ + @scala.annotation.varargs + def inflate(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.compress.InflateUDF", + "inflate", + exprs + ) + } + + /** + * @see [[hivemall.tools.compress.DeflateUDF]] + * @group tools.compress + */ + @scala.annotation.varargs + def deflate(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.compress.DeflateUDF", + "deflate", + exprs + ) + } + + /** + * @see [[hivemall.tools.map.MapGetSumUDF]] + * @group tools.map + */ + @scala.annotation.varargs + def map_get_sum(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.map.MapGetSumUDF", + "map_get_sum", + exprs + ) + } + + /** + * @see [[hivemall.tools.map.MapTailNUDF]] + * @group tools.map + */ + @scala.annotation.varargs + def map_tail_n(exprs: Column*): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.map.MapTailNUDF", + "map_tail_n", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.TokenizeUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def tokenize(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.TokenizeUDF", + "tokenize", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.StopwordUDF]] + * @group tools.text + */ + def is_stopword(word: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.StopwordUDF", + "is_stopword", + word :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.SingularizeUDF]] + * @group tools.text + */ + def singularize(word: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.SingularizeUDF", + "singularize", + word :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.SplitWordsUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def split_words(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.SplitWordsUDF", + "split_words", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.NormalizeUnicodeUDF]] + * @group tools.text + */ + @scala.annotation.varargs + def normalize_unicode(exprs: Column*): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.NormalizeUnicodeUDF", + "normalize_unicode", + exprs + ) + } + + /** + * @see [[hivemall.tools.text.Base91UDF]] + * @group tools.text + */ + def base91(bin: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.text.Base91UDF", + "base91", + bin :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.Unbase91UDF]] + * @group tools.text + */ + def unbase91(base91String: Column): Column = withExpr { + planHiveGenericUDF( + "hivemall.tools.text.Unbase91UDF", + "unbase91", + base91String :: Nil + ) + } + + /** + * @see [[hivemall.tools.text.WordNgramsUDF]] + * @group tools.text + */ + def word_ngrams(words: Column, minSize: Column, maxSize: Column): Column = withExpr { + planHiveUDF( + "hivemall.tools.text.WordNgramsUDF", + "word_ngrams", + words :: minSize :: maxSize :: Nil + ) + } + + /** * @see [[hivemall.tools.math.SigmoidGenericUDF]] * @group misc */
